http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java index 1e12d4c..0d5c874 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java @@ -22,14 +22,21 @@ import java.util.LinkedList; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.CoreAddressConfiguration; +import org.apache.activemq.artemis.core.config.CoreQueueConfiguration; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.server.JMSServerManager; import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.Symbol; import org.junit.After; import org.junit.Before; @@ -39,6 +46,10 @@ import org.junit.Before; */ public class AmqpClientTestSupport extends ActiveMQTestBase { + protected static Symbol SHARED = Symbol.getSymbol("shared"); + protected static Symbol GLOBAL = Symbol.getSymbol("global"); + + private boolean useSSL; protected JMSServerManager serverManager; @@ -86,6 +97,12 @@ public class AmqpClientTestSupport extends ActiveMQTestBase { ActiveMQServer server = createServer(true, true); serverManager = new JMSServerManagerImpl(server); Configuration serverConfig = server.getConfiguration(); + CoreAddressConfiguration address = new CoreAddressConfiguration(); + address.setName(getTestName()).getRoutingTypes().add(RoutingType.ANYCAST); + CoreQueueConfiguration queueConfig = new CoreQueueConfiguration(); + queueConfig.setName(getTestName()).setAddress(getTestName()).setRoutingType(RoutingType.ANYCAST); + address.getQueueConfigurations().add(queueConfig); + serverConfig.addAddressConfiguration(address); serverConfig.getAddressesSettings().put("#", new AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ"))); serverConfig.setSecurityEnabled(false); serverManager.start(); @@ -179,4 +196,19 @@ public class AmqpClientTestSupport extends ActiveMQTestBase { public AmqpClient createAmqpClient(URI brokerURI, String username, String password) throws Exception { return new AmqpClient(brokerURI, username, password); } + + + protected void sendMessages(int numMessages, String address) throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(address); + for (int i = 0; i < numMessages; i++) { + AmqpMessage message = new AmqpMessage(); + message.setText("message-" + i); + sender.send(message); + } + sender.close(); + connection.connect(); + } }
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java index abc422b..e760d77 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java @@ -24,6 +24,8 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpFrameValidator; @@ -54,7 +56,8 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport { @Override public void setUp() throws Exception { super.setUp(); - server.createQueue(new SimpleString(getTopicName()), new SimpleString(getTopicName()), null, true, false); + server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(getTopicName()), RoutingType.MULTICAST)); + server.createQueue(new SimpleString(getTopicName()), RoutingType.MULTICAST, new SimpleString(getTopicName()), null, true, false); } @Test(timeout = 60000) @@ -371,6 +374,6 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport { } public String getTopicName() { - return "topic://myTopic"; + return "myTopic"; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java index c599f38..4dbe21e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java @@ -111,8 +111,6 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport { sender.close(); - Thread.sleep(200); - queueView = getProxyToQueue(remoteTarget.getAddress()); assertNull(queueView); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java index e42a718..1708720 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java @@ -20,7 +20,6 @@ package org.apache.activemq.artemis.tests.integration.amqp; import java.util.ArrayList; import java.util.concurrent.TimeUnit; -import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; @@ -28,7 +27,6 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; -import org.junit.Before; import org.junit.Test; /** @@ -36,11 +34,6 @@ import org.junit.Test; */ public class AmqpTransactionTest extends AmqpClientTestSupport { - @Before - public void createQueue() throws Exception { - server.createQueue(SimpleString.toSimpleString(getTestName()), SimpleString.toSimpleString(getTestName()), null, true, false); - } - @Test(timeout = 30000) public void testBeginAndCommitTransaction() throws Exception { AmqpClient client = createAmqpClient(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java new file mode 100644 index 0000000..db2f1b4 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.impl.QueueImpl; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.QUEUE_CAPABILITY; +import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY; + + +public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport { + + SimpleString address = new SimpleString("testAddress"); + SimpleString queue1 = new SimpleString("queue1"); + SimpleString queue2 = new SimpleString("queue2"); + + @Test(timeout = 60000) + public void testConsumeFromSingleQueueOnAddressSameName() throws Exception { + server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); + server.createQueue(address, RoutingType.ANYCAST, address, null, true, false); + + sendMessages(1, address.toString()); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(address.toString()); + receiver.flow(1); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(address).getBindable()).getConsumerCount()); + + receiver.close(); + connection.close(); + } + + @Test(timeout = 60000) + public void testConsumeFromSingleQueueOnAddressSameNameMultipleQueues() throws Exception { + server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); + server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false); + server.createQueue(address, RoutingType.ANYCAST, address, null, true, false); + + sendMessages(2, address.toString()); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(address.toString()); + receiver.flow(1); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(address).getBindable()).getConsumerCount()); + assertEquals(0, ((QueueImpl)server.getPostOffice().getBinding(queue1).getBindable()).getConsumerCount()); + receiver.close(); + connection.close(); + } + + @Test(timeout = 60000) + public void testConsumeFromSingleQueueOnAddressDifferentName() throws Exception { + server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); + server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false); + + sendMessages(1, address.toString()); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(address.toString()); + receiver.flow(1); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(queue1).getBindable()).getConsumerCount()); + + receiver.close(); + connection.close(); + } + + @Test(timeout = 60000) + public void testConsumeFromSingleQueueOnAddressDifferentNameMultipleQueues() throws Exception { + server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); + server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false); + server.createQueue(address, RoutingType.ANYCAST, queue2, null, true, false); + + sendMessages(1, address.toString()); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(address.toString()); + receiver.flow(1); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(queue1).getBindable()).getConsumerCount()); + assertEquals(0, ((QueueImpl)server.getPostOffice().getBinding(queue2).getBindable()).getConsumerCount()); + receiver.close(); + connection.close(); + } + + @Test(timeout = 60000) + public void testConsumeFromSingleQualifiedQueueOnAddressSameName() throws Exception { + server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); + server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false); + + sendMessages(1, address.toString()); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(address.toString() + "::" + queue1.toString()); + receiver.flow(1); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(queue1).getBindable()).getConsumerCount()); + + receiver.close(); + connection.close(); + } + + @Test(timeout = 60000) + public void testConsumeWhenOnlyMulticast() throws Exception { + server.createAddressInfo(new AddressInfo(address, RoutingType.MULTICAST)); + + sendMessages(1, address.toString()); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + + AmqpSession session = connection.createSession(); + Source jmsSource = createJmsSource(false); + jmsSource.setAddress(address.toString()); + try { + session.createReceiver(jmsSource); + fail("should throw exception"); + } catch (Exception e) { + //ignore + } + connection.close(); + } + + @Test(timeout = 60000) + public void testConsumeWhenNoAddressCreatedNoAutoCreate() throws Exception { + AddressSettings settings = new AddressSettings(); + settings.setAutoCreateAddresses(false); + server.getAddressSettingsRepository().addMatch(address.toString(), settings); + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + try { + session.createReceiver(address.toString()); + fail("should throw exception"); + } catch (Exception e) { + //ignore + } + connection.close(); + } + + @Test(timeout = 60000) + public void testConsumeWhenNoAddressCreatedAutoCreate() throws Exception { + AddressSettings settings = new AddressSettings(); + settings.setAutoCreateAddresses(true); + server.getAddressSettingsRepository().addMatch(address.toString(), settings); + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + AmqpReceiver receiver = session.createReceiver(address.toString()); + sendMessages(1, address.toString()); + receiver.flow(1); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(address).getBindable()).getConsumerCount()); + connection.close(); + } + + @Test(timeout = 60000) + public void testConsumeWhenNoAddressHasBothRoutingTypesButDefaultQueueIsMultiCast() throws Exception { + AddressInfo addressInfo = new AddressInfo(address); + addressInfo.getRoutingTypes().add(RoutingType.ANYCAST); + addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); + server.createAddressInfo(addressInfo); + server.createQueue(address, RoutingType.MULTICAST, address, null, true, false); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + try { + session.createReceiver(address.toString()); + fail("expected exception"); + } catch (Exception e) { + //ignore + } + connection.close(); + } + + + protected Source createJmsSource(boolean topic) { + + Source source = new Source(); + // Set the capability to indicate the node type being created + if (!topic) { + source.setCapabilities(QUEUE_CAPABILITY); + } else { + source.setCapabilities(TOPIC_CAPABILITY); + } + + return source; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java new file mode 100644 index 0000000..6a114d7 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.impl.QueueImpl; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.QUEUE_CAPABILITY; +import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY; + + +public class BrokerDefinedMulticastConsumerTest extends AmqpClientTestSupport { + + SimpleString address = new SimpleString("testAddress"); + SimpleString queue1 = new SimpleString("queue1"); + SimpleString queue2 = new SimpleString("queue2"); + + @Test(timeout = 60000) + public void testConsumeFromSingleQueueOnAddressSameName() throws Exception { + server.createAddressInfo(new AddressInfo(address, RoutingType.MULTICAST)); + server.createQueue(address, RoutingType.MULTICAST, address, null, true, false); + + sendMessages(1, address.toString()); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(address.toString() + "::" + address.toString()); + receiver.flow(1); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(address).getBindable()).getConsumerCount()); + + receiver.close(); + connection.close(); + } + + @Test(timeout = 60000) + public void testConsumeWhenOnlyAnycast() throws Exception { + server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); + + sendMessages(1, address.toString()); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + + AmqpSession session = connection.createSession(); + Source jmsSource = createJmsSource(true); + jmsSource.setAddress(address.toString()); + try { + session.createReceiver(jmsSource); + fail("should throw exception"); + } catch (Exception e) { + //ignore + } + connection.close(); + } + + @Test(timeout = 60000) + public void testConsumeWhenNoAddressHasBothRoutingTypesButDefaultQueueIsAnyCast() throws Exception { + AddressInfo addressInfo = new AddressInfo(address); + addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); + addressInfo.getRoutingTypes().add(RoutingType.ANYCAST); + server.createAddressInfo(addressInfo); + server.createQueue(address, RoutingType.MULTICAST, address, null, true, false); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + try { + session.createReceiver(address.toString()); + fail("expected exception"); + } catch (Exception e) { + //ignore + } + connection.close(); + } + + + protected Source createJmsSource(boolean topic) { + + Source source = new Source(); + // Set the capability to indicate the node type being created + if (!topic) { + source.setCapabilities(QUEUE_CAPABILITY); + } else { + source.setCapabilities(TOPIC_CAPABILITY); + } + + return source; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java new file mode 100644 index 0000000..377cf86 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.impl.QueueImpl; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +public class ClientDefinedAnycastConsumerTest extends AmqpClientTestSupport { + + SimpleString address = new SimpleString("testAddress"); + + @Test(timeout = 60000) + public void testConsumeFromSingleQueueOnAddressSameName() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(address.toString()); + sendMessages(1, address.toString()); + receiver.flow(1); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(address).getBindable()).getConsumerCount()); + + receiver.close(); + connection.close(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java new file mode 100644 index 0000000..9b5187f --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java @@ -0,0 +1,327 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.impl.QueueImpl; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.TerminusDurability; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY; + +public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { + + SimpleString address = new SimpleString("testAddress"); + + @Test(timeout = 60000) + public void test2ConsumersOnSharedVolatileAddress() throws Exception { + AddressInfo addressInfo = new AddressInfo(address); + addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); + server.createAddressInfo(addressInfo); + AmqpClient client = createAmqpClient(); + + AmqpConnection connection = addConnection(client.connect("myClientId")); + AmqpSession session = connection.createSession(); + Source source = createSharedSource(TerminusDurability.NONE); + AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub"); + AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2"); + receiver.flow(1); + receiver2.flow(1); + sendMessages(2, address.toString()); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount()); + receiver.close(); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); + receiver2.close(); + //check its been deleted + assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); + connection.close(); + } + + @Test(timeout = 60000) + public void test2ConsumersOnSharedVolatileAddressBrokerDefined() throws Exception { + AddressInfo addressInfo = new AddressInfo(address); + addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); + server.createAddressInfo(addressInfo); + server.createQueue(address, RoutingType.MULTICAST, SimpleString.toSimpleString("myClientId.mySub:shared-volatile"), null, true, false, -1, false, false); + AmqpClient client = createAmqpClient(); + + AmqpConnection connection = addConnection(client.connect("myClientId")); + AmqpSession session = connection.createSession(); + Source source = createSharedSource(TerminusDurability.NONE); + AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub"); + AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|1"); + receiver.flow(1); + receiver2.flow(1); + sendMessages(2, address.toString()); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount()); + receiver.close(); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); + receiver2.close(); + //check its **Hasn't** been deleted + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); + connection.close(); + } + + @Test(timeout = 60000) + public void test2ConsumersOnSharedVolatileAddressNoReceiverClose() throws Exception { + AddressInfo addressInfo = new AddressInfo(address); + addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); + server.createAddressInfo(addressInfo); + AmqpClient client = createAmqpClient(); + + AmqpConnection connection = addConnection(client.connect("myClientId")); + AmqpSession session = connection.createSession(); + Source source = createSharedSource(TerminusDurability.NONE); + AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub"); + AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2"); + receiver.flow(1); + receiver2.flow(1); + sendMessages(2, address.toString()); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount()); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); + //check its been deleted + connection.close(); + assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); + } + + @Test(timeout = 60000) + public void test2ConsumersOnSharedVolatileAddressGlobal() throws Exception { + AddressInfo addressInfo = new AddressInfo(address); + addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); + server.createAddressInfo(addressInfo); + AmqpClient client = createAmqpClient(); + + AmqpConnection connection = addConnection(client.connect(false)); + AmqpSession session = connection.createSession(); + Source source = createSharedGlobalSource(TerminusDurability.NONE); + AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub"); + AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2"); + receiver.flow(1); + receiver2.flow(1); + sendMessages(2, address.toString()); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")).getBindable()).getConsumerCount()); + receiver.close(); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global"))); + receiver2.close(); + //check its been deleted + assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global"))); + connection.close(); + } + + @Test(timeout = 60000) + public void test2ConsumersOnSharedDurableAddress() throws Exception { + AddressInfo addressInfo = new AddressInfo(address); + addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); + server.createAddressInfo(addressInfo); + AmqpClient client = createAmqpClient(); + + AmqpConnection connection = addConnection(client.connect("myClientId")); + AmqpSession session = connection.createSession(); + Source source = createSharedSource(TerminusDurability.CONFIGURATION); + AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub"); + AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2"); + receiver.flow(1); + receiver2.flow(1); + sendMessages(2, address.toString()); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")).getBindable()).getConsumerCount()); + receiver.close(); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub"))); + receiver2.close(); + //check its been deleted + assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub"))); + connection.close(); + } + + @Test(timeout = 60000) + public void test2ConsumersOnSharedDurableAddressReconnect() throws Exception { + AddressInfo addressInfo = new AddressInfo(address); + addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); + server.createAddressInfo(addressInfo); + AmqpClient client = createAmqpClient(); + + AmqpConnection connection = addConnection(client.connect("myClientId")); + AmqpSession session = connection.createSession(); + Source source = createSharedSource(TerminusDurability.CONFIGURATION); + AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub"); + AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2"); + receiver.flow(1); + receiver2.flow(1); + sendMessages(2, address.toString()); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")).getBindable()).getConsumerCount()); + + connection.close(); + + connection = addConnection(client.connect("myClientId")); + session = connection.createSession(); + + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub"))); + receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub"); + receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2"); + + receiver.close(); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub"))); + receiver2.close(); + //check its been deleted + assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub"))); + connection.close(); + } + + @Test(timeout = 60000) + public void test2ConsumersOnSharedDurableAddressReconnectwithNull() throws Exception { + AddressInfo addressInfo = new AddressInfo(address); + addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); + server.createAddressInfo(addressInfo); + AmqpClient client = createAmqpClient(); + + AmqpConnection connection = addConnection(client.connect("myClientId")); + AmqpSession session = connection.createSession(); + Source source = createSharedSource(TerminusDurability.CONFIGURATION); + AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub"); + AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2"); + receiver.flow(1); + receiver2.flow(1); + sendMessages(2, address.toString()); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")).getBindable()).getConsumerCount()); + + connection.close(); + + connection = addConnection(client.connect("myClientId")); + session = connection.createSession(); + + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub"))); + receiver = session.createDurableReceiver(null, "mySub"); + receiver2 = session.createDurableReceiver(null, "mySub|2"); + + receiver.close(); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub"))); + receiver2.close(); + //check its been deleted + assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub"))); + connection.close(); + } + + @Test(timeout = 60000) + public void test2ConsumersOnSharedDurableAddressGlobal() throws Exception { + AddressInfo addressInfo = new AddressInfo(address); + addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); + server.createAddressInfo(addressInfo); + AmqpClient client = createAmqpClient(); + + AmqpConnection connection = addConnection(client.connect(false)); + AmqpSession session = connection.createSession(); + Source source = createSharedGlobalSource(TerminusDurability.CONFIGURATION); + AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub"); + AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2"); + receiver.flow(1); + receiver2.flow(1); + sendMessages(2, address.toString()); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global")).getBindable()).getConsumerCount()); + receiver.close(); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global"))); + receiver2.close(); + //check its been deleted + assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global"))); + connection.close(); + } + + @Test(timeout = 60000) + public void test2ConsumersOnNonSharedDurableAddress() throws Exception { + AddressInfo addressInfo = new AddressInfo(address); + addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); + server.createAddressInfo(addressInfo); + AmqpClient client = createAmqpClient(); + + AmqpConnection connection = addConnection(client.connect("myClientId")); + AmqpSession session = connection.createSession(); + Source source = createNonSharedSource(TerminusDurability.CONFIGURATION); + Source source1 = createSharedSource(TerminusDurability.CONFIGURATION); + AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub"); + try { + session.createMulticastReceiver(source1, "myReceiverID", "mySub|2"); + fail("Exception expected"); + } catch (Exception e) { + //expected + } + connection.close(); + } + + private Source createNonSharedSource(TerminusDurability terminusDurability) { + Source source = new Source(); + source.setAddress(address.toString()); + source.setCapabilities(TOPIC_CAPABILITY); + source.setDurable(terminusDurability); + return source; + } + + private Source createSharedSource(TerminusDurability terminusDurability) { + Source source = new Source(); + source.setAddress(address.toString()); + source.setCapabilities(TOPIC_CAPABILITY, SHARED); + source.setDurable(terminusDurability); + return source; + } + + private Source createSharedGlobalSource(TerminusDurability terminusDurability) { + Source source = new Source(); + source.setAddress(address.toString()); + source.setCapabilities(TOPIC_CAPABILITY, SHARED, GLOBAL); + source.setDurable(terminusDurability); + return source; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java index 39197fd..3965947 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java @@ -30,6 +30,8 @@ import javax.jms.TopicSubscriber; import java.util.Map; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.qpid.jms.JmsConnectionFactory; import org.junit.After; import org.junit.Assert; @@ -55,6 +57,8 @@ public class ProtonPubSubTest extends ProtonTestBase { @Before public void setUp() throws Exception { super.setUp(); + server.createAddressInfo(new AddressInfo(ssPubAddress, RoutingType.MULTICAST)); + server.createAddressInfo(new AddressInfo(ssprefixedPubAddress, RoutingType.MULTICAST)); server.createQueue(ssPubAddress, ssPubAddress, new SimpleString("foo=bar"), false, true); server.createQueue(ssprefixedPubAddress, ssprefixedPubAddress, new SimpleString("foo=bar"), false, true); factory = new JmsConnectionFactory("amqp://localhost:5672"); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java index 5c56224..5e9b368 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java @@ -70,6 +70,8 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.protocol.amqp.client.AMQPClientConnectionFactory; @@ -151,20 +153,31 @@ public class ProtonTest extends ProtonTestBase { @Before public void setUp() throws Exception { super.setUp(); - - server.createQueue(new SimpleString(coreAddress), new SimpleString(coreAddress), null, true, false); - server.createQueue(new SimpleString(coreAddress + "1"), new SimpleString(coreAddress + "1"), null, true, false); - server.createQueue(new SimpleString(coreAddress + "2"), new SimpleString(coreAddress + "2"), null, true, false); - server.createQueue(new SimpleString(coreAddress + "3"), new SimpleString(coreAddress + "3"), null, true, false); - server.createQueue(new SimpleString(coreAddress + "4"), new SimpleString(coreAddress + "4"), null, true, false); - server.createQueue(new SimpleString(coreAddress + "5"), new SimpleString(coreAddress + "5"), null, true, false); - server.createQueue(new SimpleString(coreAddress + "6"), new SimpleString(coreAddress + "6"), null, true, false); - server.createQueue(new SimpleString(coreAddress + "7"), new SimpleString(coreAddress + "7"), null, true, false); - server.createQueue(new SimpleString(coreAddress + "8"), new SimpleString(coreAddress + "8"), null, true, false); - server.createQueue(new SimpleString(coreAddress + "9"), new SimpleString(coreAddress + "9"), null, true, false); - server.createQueue(new SimpleString(coreAddress + "10"), new SimpleString(coreAddress + "10"), null, true, false); - server.createQueue(new SimpleString("amqp_testtopic"), new SimpleString("amqp_testtopic"), null, true, false); - server.createQueue(new SimpleString("amqp_testtopic" + "1"), new SimpleString("amqp_testtopic" + "1"), null, true, false); + server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress), RoutingType.ANYCAST)); + server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "1"), RoutingType.ANYCAST)); + server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "2"), RoutingType.ANYCAST)); + server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "3"), RoutingType.ANYCAST)); + server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "4"), RoutingType.ANYCAST)); + server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "5"), RoutingType.ANYCAST)); + server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "6"), RoutingType.ANYCAST)); + server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "7"), RoutingType.ANYCAST)); + server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "8"), RoutingType.ANYCAST)); + server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "9"), RoutingType.ANYCAST)); + server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "10"), RoutingType.ANYCAST)); + server.createQueue(new SimpleString(coreAddress), RoutingType.ANYCAST, new SimpleString(coreAddress), null, true, false); + server.createQueue(new SimpleString(coreAddress + "1"), RoutingType.ANYCAST, new SimpleString(coreAddress + "1"), null, true, false); + server.createQueue(new SimpleString(coreAddress + "2"), RoutingType.ANYCAST, new SimpleString(coreAddress + "2"), null, true, false); + server.createQueue(new SimpleString(coreAddress + "3"), RoutingType.ANYCAST, new SimpleString(coreAddress + "3"), null, true, false); + server.createQueue(new SimpleString(coreAddress + "4"), RoutingType.ANYCAST, new SimpleString(coreAddress + "4"), null, true, false); + server.createQueue(new SimpleString(coreAddress + "5"), RoutingType.ANYCAST, new SimpleString(coreAddress + "5"), null, true, false); + server.createQueue(new SimpleString(coreAddress + "6"), RoutingType.ANYCAST, new SimpleString(coreAddress + "6"), null, true, false); + server.createQueue(new SimpleString(coreAddress + "7"), RoutingType.ANYCAST, new SimpleString(coreAddress + "7"), null, true, false); + server.createQueue(new SimpleString(coreAddress + "8"), RoutingType.ANYCAST, new SimpleString(coreAddress + "8"), null, true, false); + server.createQueue(new SimpleString(coreAddress + "9"), RoutingType.ANYCAST, new SimpleString(coreAddress + "9"), null, true, false); + server.createQueue(new SimpleString(coreAddress + "10"), RoutingType.ANYCAST, new SimpleString(coreAddress + "10"), null, true, false); + server.createAddressInfo(new AddressInfo(new SimpleString("amqp_testtopic"), RoutingType.MULTICAST)); + server.createQueue(new SimpleString("amqp_testtopic"), RoutingType.MULTICAST, new SimpleString("amqp_testtopic"), null, true, false); + /* server.createQueue(new SimpleString("amqp_testtopic" + "1"), new SimpleString("amqp_testtopic" + "1"), null, true, false); server.createQueue(new SimpleString("amqp_testtopic" + "2"), new SimpleString("amqp_testtopic" + "2"), null, true, false); server.createQueue(new SimpleString("amqp_testtopic" + "3"), new SimpleString("amqp_testtopic" + "3"), null, true, false); server.createQueue(new SimpleString("amqp_testtopic" + "4"), new SimpleString("amqp_testtopic" + "4"), null, true, false); @@ -173,7 +186,7 @@ public class ProtonTest extends ProtonTestBase { server.createQueue(new SimpleString("amqp_testtopic" + "7"), new SimpleString("amqp_testtopic" + "7"), null, true, false); server.createQueue(new SimpleString("amqp_testtopic" + "8"), new SimpleString("amqp_testtopic" + "8"), null, true, false); server.createQueue(new SimpleString("amqp_testtopic" + "9"), new SimpleString("amqp_testtopic" + "9"), null, true, false); - server.createQueue(new SimpleString("amqp_testtopic" + "10"), new SimpleString("amqp_testtopic" + "10"), null, true, false); + server.createQueue(new SimpleString("amqp_testtopic" + "10"), new SimpleString("amqp_testtopic" + "10"), null, true, false);*/ connection = createConnection(); @@ -769,6 +782,12 @@ public class ProtonTest extends ProtonTestBase { @Test public void testLinkDetatchErrorIsCorrectWhenQueueDoesNotExists() throws Exception { + AddressSettings value = new AddressSettings(); + value.setAutoCreateJmsQueues(false); + value.setAutoCreateQueues(false); + value.setAutoCreateAddresses(false); + value.setAutoCreateJmsTopics(false); + server.getAddressSettingsRepository().addMatch("AnAddressThatDoesNotExist", value); AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); AmqpConnection amqpConnection = client.connect(); AmqpSession session = amqpConnection.createSession(); @@ -784,6 +803,7 @@ public class ProtonTest extends ProtonTestBase { assertNotNull(expectedException); assertTrue(expectedException.getMessage().contains("amqp:not-found")); assertTrue(expectedException.getMessage().contains("target address does not exist")); + amqpConnection.close(); } @Test @@ -838,6 +858,7 @@ public class ProtonTest extends ProtonTestBase { @Test public void testClientIdIsSetInSubscriptionList() throws Exception { AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); + server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString("mytopic"), RoutingType.ANYCAST)); AmqpConnection amqpConnection = client.createConnection(); amqpConnection.setContainerId("testClient"); amqpConnection.setOfferedCapabilities(Arrays.asList(Symbol.getSymbol("topic"))); @@ -866,14 +887,14 @@ public class ProtonTest extends ProtonTestBase { String queueName = "TestQueueName"; String address = "TestAddress"; - - server.createQueue(new SimpleString(address), new SimpleString(queueName), null, true, false); + server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(address), RoutingType.ANYCAST)); + server.createQueue(new SimpleString(address), RoutingType.ANYCAST, new SimpleString(queueName), null, true, false); AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); AmqpConnection amqpConnection = client.connect(); AmqpSession session = amqpConnection.createSession(); AmqpSender sender = session.createSender(address); - AmqpReceiver receiver = session.createReceiver(queueName); + AmqpReceiver receiver = session.createReceiver(address); receiver.flow(1); AmqpMessage message = new AmqpMessage(); @@ -882,6 +903,7 @@ public class ProtonTest extends ProtonTestBase { AmqpMessage receivedMessage = receiver.receive(5000, TimeUnit.MILLISECONDS); assertNotNull(receivedMessage); + amqpConnection.close(); } @Test http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java index f19b0a4..f424ea2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java @@ -25,7 +25,10 @@ import javax.jms.Session; import javax.jms.TextMessage; import java.util.Random; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.qpid.jms.JmsConnectionFactory; import org.junit.After; @@ -42,6 +45,7 @@ public class SendingAndReceivingTest extends ActiveMQTestBase { public void setUp() throws Exception { super.setUp(); server = createServer(true, true); + server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString("exampleQueue"), RoutingType.ANYCAST)); server.start(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java index b75e019..829410d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java @@ -54,6 +54,7 @@ import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.jlibaio.LibaioContext; @@ -185,12 +186,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase { ActiveMQServerControl serverControl = createManagementControl(); - checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); - - serverControl.createQueue(address.toString(), name.toString()); + checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST)); + serverControl.createAddress(address.toString(), "ANYCAST"); + serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, true, -1, false, false); - checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); - QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer); + checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST)); + QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer); Assert.assertEquals(address.toString(), queueControl.getAddress()); Assert.assertEquals(name.toString(), queueControl.getName()); Assert.assertNull(queueControl.getFilter()); @@ -211,12 +212,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase { ActiveMQServerControl serverControl = createManagementControl(); - checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); - - serverControl.createQueue(address.toString(), name.toString(), filter, durable); + checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST)); + serverControl.createAddress(address.toString(), "ANYCAST"); + serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), filter, durable, -1, false, false); - checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); - QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer); + checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST)); + QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer); Assert.assertEquals(address.toString(), queueControl.getAddress()); Assert.assertEquals(name.toString(), queueControl.getName()); Assert.assertEquals(filter, queueControl.getFilter()); @@ -236,12 +237,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase { ActiveMQServerControl serverControl = createManagementControl(); - checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); - - serverControl.createQueue(address.toString(), name.toString(), durable); + checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST)); + serverControl.createAddress(address.toString(), "ANYCAST"); + serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, durable, -1, false, false); - checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); - QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer); + checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST)); + QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer); Assert.assertEquals(address.toString(), queueControl.getAddress()); Assert.assertEquals(name.toString(), queueControl.getName()); Assert.assertNull(queueControl.getFilter()); @@ -264,12 +265,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase { ActiveMQServerControl serverControl = createManagementControl(); - checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); - + checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST)); + serverControl.createAddress(address.toString(), "ANYCAST"); serverControl.createQueue(address.toString(), RoutingType.ANYCAST.toString(), name.toString(), null, durable, maxConsumers, deleteOnNoConsumers, autoCreateAddress); checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST)); - QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer); + QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer); Assert.assertEquals(address.toString(), queueControl.getAddress()); Assert.assertEquals(name.toString(), queueControl.getName()); Assert.assertNull(queueControl.getFilter()); @@ -297,8 +298,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase { ActiveMQServerControl serverControl = createManagementControl(); checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); - - serverControl.createQueue(address.toString(), name.toString(), durable); + serverControl.createAddress(address.toString(), "ANYCAST"); + serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, durable, -1, false, false); ServerLocator receiveLocator = createInVMNonHALocator(); ClientSessionFactory receiveCsf = createSessionFactory(receiveLocator); @@ -307,7 +308,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase { Assert.assertFalse(consumer.isClosed()); - checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); + checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST)); serverControl.destroyQueue(name.toString(), true); Wait.waitFor(new Wait.Condition() { @Override @@ -329,12 +330,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase { ActiveMQServerControl serverControl = createManagementControl(); - checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); - - serverControl.createQueue(address.toString(), name.toString(), filter, durable); + checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST)); + serverControl.createAddress(address.toString(), "ANYCAST"); + serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), filter, durable, -1, false, false); - checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); - QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer); + checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST)); + QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer); Assert.assertEquals(address.toString(), queueControl.getAddress()); Assert.assertEquals(name.toString(), queueControl.getName()); Assert.assertNull(queueControl.getFilter()); @@ -355,12 +356,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase { ActiveMQServerControl serverControl = createManagementControl(); - checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); + checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST)); + serverControl.createAddress(address.toString(), "ANYCAST"); + serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), filter, durable, -1, false, false); - serverControl.createQueue(address.toString(), name.toString(), filter, durable); - - checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); - QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer); + checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST)); + QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer); Assert.assertEquals(address.toString(), queueControl.getAddress()); Assert.assertEquals(name.toString(), queueControl.getName()); Assert.assertNull(queueControl.getFilter()); @@ -383,8 +384,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase { // management operations Assert.assertFalse(ActiveMQServerControlTest.contains(name.toString(), serverControl.getQueueNames())); - - serverControl.createQueue(address.toString(), name.toString()); + serverControl.createAddress(address.toString(), "ANYCAST"); + serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, true, -1, false, false); Assert.assertTrue(ActiveMQServerControlTest.contains(name.toString(), serverControl.getQueueNames())); serverControl.destroyQueue(name.toString()); @@ -402,8 +403,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase { // management operations Assert.assertFalse(ActiveMQServerControlTest.contains(address.toString(), serverControl.getAddressNames())); - - serverControl.createQueue(address.toString(), name.toString()); + serverControl.createAddress(address.toString(), "ANYCAST"); + serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, true, -1, false, false); Assert.assertTrue(ActiveMQServerControlTest.contains(address.toString(), serverControl.getAddressNames())); serverControl.destroyQueue(name.toString()); @@ -1212,7 +1213,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase { ServerLocator locator = createInVMNonHALocator(); ClientSessionFactory factory = createSessionFactory(locator); ClientSession session = addClientSession(factory.createSession()); - server.createQueue(queueName, queueName, null, false, false); + server.createAddressInfo(new AddressInfo(queueName, RoutingType.ANYCAST)); + server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, false, false); addClientConsumer(session.createConsumer(queueName)); Thread.sleep(100); // We check the timestamp for the creation time. We need to make sure it's different addClientConsumer(session.createConsumer(queueName, SimpleString.toSimpleString(filter), true)); @@ -1269,8 +1271,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase { ServerLocator locator2 = createInVMNonHALocator(); ClientSessionFactory factory2 = createSessionFactory(locator2); ClientSession session2 = addClientSession(factory2.createSession()); - - server.createQueue(queueName, queueName, null, false, false); + serverControl.createAddress(queueName.toString(), "ANYCAST"); + server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, false, false); addClientConsumer(session.createConsumer(queueName)); Thread.sleep(200); @@ -1335,7 +1337,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase { @Test public void testListSessionsAsJSON() throws Exception { SimpleString queueName = new SimpleString(UUID.randomUUID().toString()); - server.createQueue(queueName, queueName, null, false, false); + server.createAddressInfo(new AddressInfo(queueName, RoutingType.ANYCAST)); + server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, false, false); ActiveMQServerControl serverControl = createManagementControl(); ServerLocator locator = createInVMNonHALocator(); @@ -1400,8 +1403,10 @@ public class ActiveMQServerControlTest extends ManagementTestBase { this.conf.clearConnectorConfigurations().addConnectorConfiguration("server2-connector", new TransportConfiguration(INVM_CONNECTOR_FACTORY, params)); server2.start(); - server.createQueue(address, address, null, true, false); - server2.createQueue(address, address, null, true, false); + server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); + server.createQueue(address, RoutingType.ANYCAST, address, null, true, false, -1, false, false); + server2.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); + server2.createQueue(address, RoutingType.ANYCAST, address, null, true, false, -1, false, false); ServerLocator locator = createInVMNonHALocator(); ClientSessionFactory csf = createSessionFactory(locator); ClientSession session = csf.createSession(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java index 280fdc4..2831f79 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java @@ -127,6 +127,21 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes } @Override + public void createQueue(String address, String name, String routingType) throws Exception { + proxy.invokeOperation("createQueue", address, name, routingType); + } + + @Override + public void createQueue(String address, String name, boolean durable, String routingType) throws Exception { + proxy.invokeOperation("createQueue", address, name, durable, routingType); + } + + @Override + public void createQueue(String address,String name, String filter, boolean durable, String routingType) throws Exception { + proxy.invokeOperation("createQueue", address, name, filter, durable, routingType); + } + + @Override public void createQueue(final String address, final String name, final boolean durable) throws Exception { proxy.invokeOperation("createQueue", address, name, durable); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementControlHelper.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementControlHelper.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementControlHelper.java index 6bc8f3d..11785e4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementControlHelper.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementControlHelper.java @@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.management.ClusterConnectionControl; import org.apache.activemq.artemis.api.core.management.DivertControl; import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; import org.apache.activemq.artemis.api.core.management.QueueControl; +import org.apache.activemq.artemis.core.server.RoutingType; public class ManagementControlHelper { @@ -73,6 +74,13 @@ public class ManagementControlHelper { return (QueueControl) ManagementControlHelper.createProxy(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, ActiveMQDefaultConfiguration.getDefaultRoutingType()), QueueControl.class, mbeanServer); } + public static QueueControl createQueueControl(final SimpleString address, + final SimpleString name, + final RoutingType routingType, + final MBeanServer mbeanServer) throws Exception { + return (QueueControl) ManagementControlHelper.createProxy(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, routingType), QueueControl.class, mbeanServer); + } + public static AddressControl createAddressControl(final SimpleString address, final MBeanServer mbeanServer) throws Exception { return (AddressControl) ManagementControlHelper.createProxy(ObjectNameBuilder.DEFAULT.getAddressObjectName(address), AddressControl.class, mbeanServer); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java index e6026c4..63cf579 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java @@ -33,7 +33,6 @@ import org.apache.activemq.ActiveMQXAConnectionFactory; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.RoutingType; -import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.command.ActiveMQDestination; import org.junit.After; import org.junit.Before; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java index f2c844e..d272c02 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java @@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl; @@ -51,6 +52,17 @@ public class FakePostOffice implements PostOffice { } @Override + public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) { + + return null; + } + + @Override + public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) { + return null; + } + + @Override public void start() throws Exception { }
