ARTEMIS-789 Add OpenWire Tests for MULTICAST,ANYCAST
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a182a135 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a182a135 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a182a135 Branch: refs/heads/master Commit: a182a135e963c1a4355c3f7d7d72d5ee6b70ed14 Parents: b742a35 Author: Howard Gao <[email protected]> Authored: Thu Dec 1 20:01:57 2016 +0800 Committer: Martyn Taylor <[email protected]> Committed: Fri Dec 9 18:43:15 2016 +0000 ---------------------------------------------------------------------- .../protocol/openwire/OpenWireConnection.java | 6 +- .../core/protocol/openwire/amq/AMQConsumer.java | 7 +- .../core/protocol/openwire/amq/AMQSession.java | 10 +- .../integration/addressing/MulticastTest.java | 189 +++++++++++++++++++ .../integration/openwire/BasicOpenWireTest.java | 9 +- 5 files changed, 211 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a182a135/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index fb7a364..d6add20 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -35,7 +35,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.advisory.AdvisorySupport; -import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; @@ -63,6 +62,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; @@ -716,13 +716,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se QueueBinding binding = (QueueBinding) server.getPostOffice().getBinding(qName); if (binding == null) { if (dest.isTemporary()) { - internalSession.createQueue(qName, qName, null, dest.isTemporary(), false); + internalSession.createQueue(qName, qName, RoutingType.ANYCAST, null, dest.isTemporary(), false); } else { ConnectionInfo connInfo = getState().getInfo(); CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE; server.getSecurityStore().check(qName, checkType, this); server.checkQueueCreationLimit(getUsername()); - server.createQueue(qName, ActiveMQDefaultConfiguration.DEFAULT_ROUTING_TYPE, qName, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), true, false); + server.createQueue(qName, RoutingType.ANYCAST, qName, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), true, false); } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a182a135/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index 1803cc8..ef54b59 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -37,6 +37,7 @@ import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.command.ConsumerControl; @@ -130,6 +131,10 @@ public class AMQConsumer { SimpleString queueName; + AddressInfo addressInfo = session.getCoreServer().getAddressInfo(address); + if (addressInfo != null) { + addressInfo.addRoutingType(RoutingType.MULTICAST); + } if (isDurable) { queueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, clientID, subscriptionName)); QueueQueryResult result = session.getCoreSession().executeQueueQuery(queueName); @@ -160,7 +165,7 @@ public class AMQConsumer { } else { queueName = new SimpleString(UUID.randomUUID().toString()); - session.getCoreSession().createQueue(address, queueName, selector, true, false); + session.getCoreSession().createQueue(address, queueName, RoutingType.MULTICAST, selector, true, false); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a182a135/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index a92a379..c64374a 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -23,7 +23,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.postoffice.RoutingStatus; @@ -34,6 +33,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.QueueQueryResult; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerSession; @@ -173,7 +173,7 @@ public class AMQSession implements SessionCallback { if (!queueBinding.isExists()) { if (isAutoCreate) { - server.createQueue(queueName, ActiveMQDefaultConfiguration.DEFAULT_ROUTING_TYPE, queueName, null, true, isTemporary); + server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, isTemporary); connection.addKnownDestination(queueName); } else { hasQueue = false; @@ -279,6 +279,7 @@ public class AMQSession implements SessionCallback { messageSend.setBrokerInTime(System.currentTimeMillis()); ActiveMQDestination destination = messageSend.getDestination(); + ActiveMQDestination[] actualDestinations = null; if (destination.isComposite()) { actualDestinations = destination.getCompositeDestinations(); @@ -382,6 +383,11 @@ public class AMQSession implements SessionCallback { checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary()); } + if (actualDestinations[i].isQueue()) { + coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType()); + } else { + coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType()); + } RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary()); if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a182a135/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/MulticastTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/MulticastTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/MulticastTest.java new file mode 100644 index 0000000..487a9f9 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/MulticastTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.addressing; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.TimeUtils; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +public class MulticastTest extends ActiveMQTestBase { + + private SimpleString baseAddress = new SimpleString("multicast.address"); + + private AddressInfo addressInfo; + + private ActiveMQServer server; + + private ClientSessionFactory sessionFactory; + + @Before + public void setup() throws Exception { + server = createServer(true); + server.start(); + + server.waitForActivation(10, TimeUnit.SECONDS); + + ServerLocator sl = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY)); + sessionFactory = sl.createSessionFactory(); + + addSessionFactory(sessionFactory); + + addressInfo = new AddressInfo(baseAddress); + addressInfo.addRoutingType(RoutingType.MULTICAST); + server.createOrUpdateAddressInfo(addressInfo); + } + + @Test + public void testTxCommitReceive() throws Exception { + + Queue q1 = server.createQueue(baseAddress, RoutingType.MULTICAST, baseAddress.concat(".1"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true); + Queue q2 = server.createQueue(baseAddress, RoutingType.MULTICAST, baseAddress.concat(".2"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true); + + ClientSession session = sessionFactory.createSession(false, false); + session.start(); + + ClientConsumer consumer1 = session.createConsumer(q1.getName()); + ClientConsumer consumer2 = session.createConsumer(q2.getName()); + + ClientProducer producer = session.createProducer(baseAddress); + + final int num = 10; + + for (int i = 0; i < num; i++) { + ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, true); + m.getBodyBuffer().writeString("AnyCast" + i); + producer.send(m); + } + assertNull(consumer1.receive(200)); + assertNull(consumer2.receive(200)); + session.commit(); + + assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num == q1.getMessageCount())); + assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num == q2.getMessageCount())); + + ClientConsumer[] consumers = new ClientConsumer[] {consumer1, consumer2}; + for (int i = 0; i < consumers.length; i++) { + + for (int j = 0; j < num; j++) { + ClientMessage m = consumers[i].receive(2000); + assertNotNull(m); + System.out.println("consumer" + i + " received: " + m.getBodyBuffer().readString()); + } + + assertNull(consumers[i].receive(200)); + session.commit(); + + assertNull(consumers[i].receive(200)); + } + + q1.deleteQueue(); + q2.deleteQueue(); + } + + @Test + public void testTxRollbackReceive() throws Exception { + + Queue q1 = server.createQueue(baseAddress, RoutingType.MULTICAST, baseAddress.concat(".1"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true); + Queue q2 = server.createQueue(baseAddress, RoutingType.MULTICAST, baseAddress.concat(".2"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true); + + ClientSession session = sessionFactory.createSession(false, false); + session.start(); + + ClientConsumer consumer1 = session.createConsumer(q1.getName()); + ClientConsumer consumer2 = session.createConsumer(q2.getName()); + + ClientProducer producer = session.createProducer(baseAddress); + + final int num = 10; + + for (int i = 0; i < num; i++) { + ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, true); + m.getBodyBuffer().writeString("AnyCast" + i); + producer.send(m); + } + assertNull(consumer1.receive(200)); + assertNull(consumer2.receive(200)); + session.commit(); + session.close(); + + assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num == q1.getMessageCount())); + assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num == q2.getMessageCount())); + + ClientSession session1 = sessionFactory.createSession(false, false); + ClientSession session2 = sessionFactory.createSession(false, false); + session1.start(); + session2.start(); + + consumer1 = session1.createConsumer(q1.getName()); + consumer2 = session2.createConsumer(q2.getName()); + + ClientConsumer[] consumers = new ClientConsumer[] {consumer1, consumer2}; + ClientSession[] sessions = new ClientSession[] {session1, session2}; + Queue[] queues = new Queue[] {q1, q2}; + + for (int i = 0; i < consumers.length; i++) { + + for (int j = 0; j < num; j++) { + ClientMessage m = consumers[i].receive(2000); + assertNotNull(m); + System.out.println("consumer" + i + " received: " + m.getBodyBuffer().readString()); + } + + assertNull(consumers[i].receive(200)); + sessions[i].rollback(); + sessions[i].close(); + + sessions[i] = sessionFactory.createSession(false, false); + sessions[i].start(); + + //receive same after rollback + consumers[i] = sessions[i].createConsumer(queues[i].getName()); + + for (int j = 0; j < num; j++) { + ClientMessage m = consumers[i].receive(2000); + assertNotNull(m); + System.out.println("consumer" + i + " received: " + m.getBodyBuffer().readString()); + } + + assertNull(consumers[i].receive(200)); + sessions[i].commit(); + + assertNull(consumers[i].receive(200)); + sessions[i].close(); + } + + q1.deleteQueue(); + q2.deleteQueue(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a182a135/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java index 7c40834..e6026c4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java @@ -33,6 +33,7 @@ import org.apache.activemq.ActiveMQXAConnectionFactory; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.command.ActiveMQDestination; import org.junit.After; import org.junit.Before; @@ -65,15 +66,15 @@ public class BasicOpenWireTest extends OpenWireTestBase { public void setUp() throws Exception { super.setUp(); SimpleString coreQueue = new SimpleString(queueName); - this.server.createQueue(coreQueue, RoutingType.MULTICAST, coreQueue, null, false, false); + this.server.createQueue(coreQueue, RoutingType.ANYCAST, coreQueue, null, false, false); testQueues.put(queueName, coreQueue); SimpleString coreQueue2 = new SimpleString(queueName2); - this.server.createQueue(coreQueue2, RoutingType.MULTICAST, coreQueue2, null, false, false); + this.server.createQueue(coreQueue2, RoutingType.ANYCAST, coreQueue2, null, false, false); testQueues.put(queueName2, coreQueue2); SimpleString durableQueue = new SimpleString(durableQueueName); - this.server.createQueue(durableQueue, RoutingType.MULTICAST, durableQueue, null, true, false); + this.server.createQueue(durableQueue, RoutingType.ANYCAST, durableQueue, null, true, false); testQueues.put(durableQueueName, durableQueue); if (!enableSecurity) { @@ -139,7 +140,7 @@ public class BasicOpenWireTest extends OpenWireTestBase { SimpleString coreQ = testQueues.get(qname); if (coreQ == null) { coreQ = new SimpleString(qname); - this.server.createQueue(coreQ, RoutingType.MULTICAST, coreQ, null, false, false); + this.server.createQueue(coreQ, RoutingType.ANYCAST, coreQ, null, false, false); testQueues.put(qname, coreQ); } }
