add missing import Fix MQTT JMSSend Test
checkstyle Ensure AddressInfo record is created on autoCreate Catch QueueExists exception when auto creating Fix Netty Stress Test Fix PendingDeliveryTest Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/56c3f977 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/56c3f977 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/56c3f977 Branch: refs/heads/ARTEMIS-780 Commit: 56c3f977752718a4c2ff9ef1c8a97db4520b246d Parents: 9e709a3 Author: Martyn Taylor <[email protected]> Authored: Thu Dec 1 12:45:22 2016 +0000 Committer: Clebert Suconic <[email protected]> Committed: Wed Dec 7 13:16:36 2016 -0500 ---------------------------------------------------------------------- .../artemis/jms/client/ActiveMQMessage.java | 1 + .../jms/client/ActiveMQMessageProducer.java | 6 +- .../artemis/jms/client/ActiveMQSession.java | 29 ++- .../core/protocol/mqtt/MQTTPublishManager.java | 16 +- .../protocol/mqtt/MQTTSubscriptionManager.java | 2 +- .../core/server/impl/ActiveMQServerImpl.java | 4 +- .../integration/addressing/AnycastTest.java | 188 +++++++++++++++++++ .../client/JmsNettyNioStressTest.java | 8 +- .../clientcrash/PendingDeliveriesTest.java | 3 +- .../integration/mqtt/imported/MQTTTest.java | 11 +- 10 files changed, 242 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/56c3f977/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java index e558197..4f0be81 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java @@ -44,6 +44,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants; import org.apache.activemq.artemis.core.message.impl.MessageInternal; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.utils.UUID; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/56c3f977/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java index 4c1d335..b814bc2 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java @@ -36,6 +36,7 @@ import javax.jms.TopicPublisher; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; +import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; @@ -421,7 +422,10 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To } else { connection.addKnownDestination(address); } - } catch (ActiveMQException e) { + } catch (ActiveMQQueueExistsException e) { + // The queue was created by another client/admin between the query check and send create queue packet + } + catch (ActiveMQException e) { throw JMSExceptionHelper.convertFromActiveMQException(e); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/56c3f977/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java index 3e9b76f..a25215e 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java @@ -301,15 +301,21 @@ public class ActiveMQSession implements QueueSession, TopicSession { ClientSession.AddressQuery response = session.addressQuery(jbd.getSimpleAddress()); if (!response.isExists()) { - if (jbd.isQueue() && response.isAutoCreateJmsQueues()) { - // perhaps just relying on the broker to do it is simplest (i.e. deleteOnNoConsumers) - session.createAddress(jbd.getSimpleAddress(), RoutingType.ANYCAST, true); - session.createQueue(jbd.getSimpleAddress(), RoutingType.ANYCAST, jbd.getSimpleAddress(), null, true, true); - } else if (!jbd.isQueue() && response.isAutoCreateJmsTopics()) { - session.createAddress(jbd.getSimpleAddress(), RoutingType.MULTICAST, true); - } else { - throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist"); + try { + if (jbd.isQueue() && response.isAutoCreateJmsQueues()) { + // perhaps just relying on the broker to do it is simplest (i.e. deleteOnNoConsumers) + session.createAddress(jbd.getSimpleAddress(), RoutingType.ANYCAST, true); + session.createQueue(jbd.getSimpleAddress(), RoutingType.ANYCAST, jbd.getSimpleAddress(), null, true, true); + } else if (!jbd.isQueue() && response.isAutoCreateJmsTopics()) { + session.createAddress(jbd.getSimpleAddress(), RoutingType.MULTICAST, true); + } else { + throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist"); + } + } + catch (ActiveMQQueueExistsException e) { + // Queue was created between our query and create queue request. Ignore. } + } } @@ -647,7 +653,12 @@ public class ActiveMQSession implements QueueSession, TopicSession { */ if (!response.isExists() || !response.getQueueNames().contains(dest.getSimpleAddress())) { if (response.isAutoCreateJmsQueues()) { - session.createQueue(dest.getSimpleAddress(), RoutingType.ANYCAST, dest.getSimpleAddress(), null, true, true); + try { + session.createQueue(dest.getSimpleAddress(), RoutingType.ANYCAST, dest.getSimpleAddress(), null, true, true); + } + catch (ActiveMQQueueExistsException e) { + // The queue was created by another client/admin between the query check and send create queue packet + } } else { throw new InvalidDestinationException("Destination " + dest.getName() + " does not exist"); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/56c3f977/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index 3a2ad7e..c266e76 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -83,7 +83,7 @@ public class MQTTPublishManager { } private void createManagementAddress() { - managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + state.getClientId()); + managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + state.getClientId()); } private void createManagementQueue() throws Exception { @@ -113,10 +113,13 @@ public class MQTTPublishManager { if (qos == 0) { sendServerMessage((int) message.getMessageID(), (ServerMessageImpl) message, deliveryCount, qos); session.getServerSession().acknowledge(consumer.getID(), message.getMessageID()); - } else { + } else if (qos == 1 || qos == 2) { int mqttid = outboundStore.generateMqttId(message.getMessageID(), consumer.getID()); outboundStore.publish(mqttid, message.getMessageID(), consumer.getID()); sendServerMessage(mqttid, (ServerMessageImpl) message, deliveryCount, qos); + } else { + // Client must have disconnected and it's Subscription QoS cleared + consumer.individualCancel(message.getMessageID(), false); } } } @@ -231,7 +234,14 @@ public class MQTTPublishManager { } private int decideQoS(ServerMessage message, ServerConsumer consumer) { - int subscriptionQoS = session.getSubscriptionManager().getConsumerQoSLevels().get(consumer.getID()); + + int subscriptionQoS = -1; + try { + subscriptionQoS = session.getSubscriptionManager().getConsumerQoSLevels().get(consumer.getID()); + } catch (NullPointerException e) { + // This can happen if the client disconnected during a server send. + return subscriptionQoS; + } int qos = 2; if (message.containsProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY)) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/56c3f977/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java index b3542d3..c4b8b94 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java @@ -94,7 +94,7 @@ public class MQTTSubscriptionManager { Queue q = session.getServer().locateQueue(queue); if (q == null) { - q = session.getServerSession().createQueue(new SimpleString(address), queue, RoutingType.MULTICAST, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, true); + q = session.getServerSession().createQueue(new SimpleString(address), queue, RoutingType.MULTICAST, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, false); } else { if (q.isDeleteOnNoConsumers()) { throw ActiveMQMessageBundle.BUNDLE.invalidQueueConfiguration(q.getAddress(), q.getName(), "deleteOnNoConsumers", false, true); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/56c3f977/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 2208cec..abcbb89 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -2477,7 +2477,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { if (info == null) { if (autoCreateAddress) { - postOffice.addAddressInfo(defaultAddressInfo.setAutoCreated(true)); + createAddressInfo(defaultAddressInfo.setAutoCreated(true)); addressAlreadyExists = false; } else { throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressName); @@ -2490,7 +2490,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { AddressInfo addressInfo = postOffice.getAddressInfo(queue.getAddress()); if (addressInfo == null) { - postOffice.addAddressInfo(new AddressInfo(queue.getAddress())); + createAddressInfo(new AddressInfo(queue.getAddress())); } else { if (!addressInfo.getRoutingTypes().contains(routingType)) { throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeForAddress(routingType, addressInfo.getName().toString(), addressInfo.getRoutingTypes()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/56c3f977/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AnycastTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AnycastTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AnycastTest.java new file mode 100644 index 0000000..9208386 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AnycastTest.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.addressing; + +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.TimeUtils; +import org.junit.Before; +import org.junit.Test; + +public class AnycastTest extends ActiveMQTestBase { + + private SimpleString baseAddress = new SimpleString("anycast.address"); + + private AddressInfo addressInfo; + + private ActiveMQServer server; + + private ClientSessionFactory sessionFactory; + + @Before + public void setup() throws Exception { + server = createServer(true); + server.start(); + + server.waitForActivation(10, TimeUnit.SECONDS); + + ServerLocator sl = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY)); + sessionFactory = sl.createSessionFactory(); + + addSessionFactory(sessionFactory); + + addressInfo = new AddressInfo(baseAddress); + addressInfo.addRoutingType(RoutingType.ANYCAST); + server.createOrUpdateAddressInfo(addressInfo); + } + + @Test + public void testTxCommitReceive() throws Exception { + + Queue q1 = server.createQueue(baseAddress, RoutingType.ANYCAST, baseAddress.concat(".1"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true); + Queue q2 = server.createQueue(baseAddress, RoutingType.ANYCAST, baseAddress.concat(".2"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true); + + ClientSession session = sessionFactory.createSession(false, false); + session.start(); + + ClientConsumer consumer1 = session.createConsumer(q1.getName()); + ClientConsumer consumer2 = session.createConsumer(q2.getName()); + + ClientProducer producer = session.createProducer(baseAddress); + + final int num = 10; + + for (int i = 0; i < num; i++) { + ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, true); + m.getBodyBuffer().writeString("AnyCast" + i); + producer.send(m); + } + assertNull(consumer1.receive(200)); + assertNull(consumer2.receive(200)); + session.commit(); + + assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num / 2 == q1.getMessageCount())); + assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num / 2 == q2.getMessageCount())); + + ClientConsumer[] consumers = new ClientConsumer[]{consumer1, consumer2}; + for (int i = 0; i < consumers.length; i++) { + + for (int j = 0; j < num / 2; j++) { + ClientMessage m = consumers[i].receive(2000); + assertNotNull(m); + System.out.println("consumer" + i + " received: " + m.getBodyBuffer().readString()); + } + + assertNull(consumers[i].receive(200)); + session.commit(); + + assertNull(consumers[i].receive(200)); + } + + q1.deleteQueue(); + q2.deleteQueue(); + } + + @Test + public void testTxRollbackReceive() throws Exception { + + Queue q1 = server.createQueue(baseAddress, RoutingType.ANYCAST, baseAddress.concat(".1"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true); + Queue q2 = server.createQueue(baseAddress, RoutingType.ANYCAST, baseAddress.concat(".2"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true); + + ClientSession session = sessionFactory.createSession(false, false); + session.start(); + + ClientConsumer consumer1 = session.createConsumer(q1.getName()); + ClientConsumer consumer2 = session.createConsumer(q2.getName()); + + ClientProducer producer = session.createProducer(baseAddress); + + final int num = 10; + + for (int i = 0; i < num; i++) { + ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, true); + m.getBodyBuffer().writeString("AnyCast" + i); + producer.send(m); + } + assertNull(consumer1.receive(200)); + assertNull(consumer2.receive(200)); + session.commit(); + session.close(); + + assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num / 2 == q1.getMessageCount())); + assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num / 2 == q2.getMessageCount())); + + ClientSession session1 = sessionFactory.createSession(false, false); + ClientSession session2 = sessionFactory.createSession(false, false); + session1.start(); + session2.start(); + + consumer1 = session1.createConsumer(q1.getName()); + consumer2 = session2.createConsumer(q2.getName()); + + ClientConsumer[] consumers = new ClientConsumer[]{consumer1, consumer2}; + ClientSession[] sessions = new ClientSession[]{session1, session2}; + Queue[] queues = new Queue[]{q1, q2}; + + for (int i = 0; i < consumers.length; i++) { + + for (int j = 0; j < num / 2; j++) { + ClientMessage m = consumers[i].receive(2000); + assertNotNull(m); + System.out.println("consumer" + i + " received: " + m.getBodyBuffer().readString()); + } + + assertNull(consumers[i].receive(200)); + sessions[i].rollback(); + sessions[i].close(); + + sessions[i] = sessionFactory.createSession(false, false); + sessions[i].start(); + + //receive same after rollback + consumers[i] = sessions[i].createConsumer(queues[i].getName()); + + for (int j = 0; j < num / 2; j++) { + ClientMessage m = consumers[i].receive(2000); + assertNotNull(m); + System.out.println("consumer" + i + " received: " + m.getBodyBuffer().readString()); + } + + assertNull(consumers[i].receive(200)); + sessions[i].commit(); + + assertNull(consumers[i].receive(200)); + sessions[i].close(); + } + + q1.deleteQueue(); + q2.deleteQueue(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/56c3f977/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JmsNettyNioStressTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JmsNettyNioStressTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JmsNettyNioStressTest.java index a721aca..ccbc4b2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JmsNettyNioStressTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JmsNettyNioStressTest.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; @@ -36,6 +37,7 @@ import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; @@ -128,8 +130,10 @@ public class JmsNettyNioStressTest extends ActiveMQTestBase { // create the 2 queues used in the test ClientSessionFactory sf = locator.createSessionFactory(transpConf); ClientSession session = sf.createTransactedSession(); - session.createQueue("queue", "queue"); - session.createQueue("queue2", "queue2"); + session.createAddress(SimpleString.toSimpleString("queue"), RoutingType.ANYCAST, false); + session.createAddress(SimpleString.toSimpleString("queue2"), RoutingType.ANYCAST, false); + session.createQueue("queue", RoutingType.ANYCAST, "queue"); + session.createQueue("queue2", RoutingType.ANYCAST, "queue2"); session.commit(); sf.close(); session.close(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/56c3f977/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/PendingDeliveriesTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/PendingDeliveriesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/PendingDeliveriesTest.java index e550bef..0738562 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/PendingDeliveriesTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/PendingDeliveriesTest.java @@ -26,6 +26,7 @@ import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.logs.AssertionLoggerHandler; import org.apache.activemq.artemis.tests.util.SpawnedVMSupport; @@ -39,7 +40,7 @@ public class PendingDeliveriesTest extends ClientTestBase { @Before public void createQueue() throws Exception { - server.createQueue(SimpleString.toSimpleString("queue1"), SimpleString.toSimpleString("queue1"), null, true, false); + server.createQueue(SimpleString.toSimpleString("queue1"), RoutingType.ANYCAST, SimpleString.toSimpleString("queue1"), null, true, false); } @After http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/56c3f977/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index c342853..58d75d8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -1096,8 +1096,8 @@ public class MQTTTest extends MQTTTestSupport { connection.start(); Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - javax.jms.Queue queue = s.createQueue(destinationName); - MessageProducer producer = s.createProducer(queue); + javax.jms.Topic topic = s.createTopic(destinationName); + MessageProducer producer = s.createProducer(topic); // send retained message from JMS final byte[] bytes = new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; @@ -1626,10 +1626,7 @@ public class MQTTTest extends MQTTTestSupport { SimpleString coreAddress = new SimpleString("foo.bar"); Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)}; - AddressInfo addressInfo = new AddressInfo(coreAddress); - getServer().createOrUpdateAddressInfo(addressInfo); - - getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId + "." + coreAddress), null, false, true, 0, false, false); + getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId + "." + coreAddress), null, false, true, 0, false, true); MQTT mqtt = createMQTTConnection(); mqtt.setClientId(clientId); @@ -1675,7 +1672,7 @@ public class MQTTTest extends MQTTTestSupport { try { String clientId = "testMqtt"; SimpleString coreAddress = new SimpleString("foo.bar"); - getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId + "." + coreAddress), null, false, true, Queue.MAX_CONSUMERS_UNLIMITED, true, false); + getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId + "." + coreAddress), null, false, true, Queue.MAX_CONSUMERS_UNLIMITED, true, true); Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)};
