Repository: qpid-broker-j Updated Branches: refs/heads/master c2c2bc470 -> e4598dcd6
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/systests/src/test/java/org/apache/qpid/server/queue/NodeAutoCreationPolicyTest.java ---------------------------------------------------------------------- diff --git a/systests/src/test/java/org/apache/qpid/server/queue/NodeAutoCreationPolicyTest.java b/systests/src/test/java/org/apache/qpid/server/queue/NodeAutoCreationPolicyTest.java index 2512332..2e471c6 100644 --- a/systests/src/test/java/org/apache/qpid/server/queue/NodeAutoCreationPolicyTest.java +++ b/systests/src/test/java/org/apache/qpid/server/queue/NodeAutoCreationPolicyTest.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.queue; import java.io.IOException; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import javax.jms.Connection; @@ -38,6 +39,9 @@ import javax.jms.Topic; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.server.exchange.ExchangeDefaults; +import org.apache.qpid.server.model.AlternateBinding; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.virtualhost.NodeAutoCreationPolicy; @@ -47,6 +51,8 @@ import org.apache.qpid.test.utils.TestBrokerConfiguration; public class NodeAutoCreationPolicyTest extends QpidBrokerTestCase { + private static final String DEAD_LETTER_QUEUE_SUFFIX = "_DLQ"; + private static final String DEAD_LETTER_EXCHANGE_SUFFIX = "_DLE"; private Connection _connection; private Session _session; @@ -122,7 +128,73 @@ public class NodeAutoCreationPolicyTest extends QpidBrokerTestCase @Override public Map<String, Object> getAttributes() { - return Collections.<String, Object>singletonMap(Exchange.TYPE, "fanout"); + return Collections.singletonMap(Exchange.TYPE, "fanout"); + } + }, + + new NodeAutoCreationPolicy() + { + @Override + public String getPattern() + { + return ".*" + DEAD_LETTER_QUEUE_SUFFIX; + } + + @Override + public boolean isCreatedOnPublish() + { + return true; + } + + @Override + public boolean isCreatedOnConsume() + { + return true; + } + + @Override + public String getNodeType() + { + return "Queue"; + } + + @Override + public Map<String, Object> getAttributes() + { + return Collections.emptyMap(); + } + }, + + new NodeAutoCreationPolicy() + { + @Override + public String getPattern() + { + return ".*" + DEAD_LETTER_EXCHANGE_SUFFIX; + } + + @Override + public boolean isCreatedOnPublish() + { + return true; + } + + @Override + public boolean isCreatedOnConsume() + { + return false; + } + + @Override + public String getNodeType() + { + return "Exchange"; + } + + @Override + public Map<String, Object> getAttributes() + { + return Collections.singletonMap(Exchange.TYPE, ExchangeDefaults.FANOUT_EXCHANGE_CLASS); } } }; @@ -266,4 +338,102 @@ public class NodeAutoCreationPolicyTest extends QpidBrokerTestCase // pass } } + + public void testQueueAlternateBindingCreation() throws Exception + { + Connection connection = getConnection(); + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + + String queueName = getTestQueueName(); + String deadLetterQueueName = queueName + DEAD_LETTER_QUEUE_SUFFIX; + + final Map<String, Object> attributes = new HashMap<>(); + Map<String, Object> expectedAlternateBinding = + Collections.singletonMap(AlternateBinding.DESTINATION, deadLetterQueueName); + attributes.put(org.apache.qpid.server.model.Queue.ALTERNATE_BINDING, + new ObjectMapper().writeValueAsString(expectedAlternateBinding)); + createEntityUsingAmqpManagement(queueName, + session, + "org.apache.qpid.Queue", attributes); + + Map<String, Object> queueAttributes = + managementReadObject(session, "org.apache.qpid.Queue", queueName, true); + + Object actualAlternateBinding = queueAttributes.get(org.apache.qpid.server.model.Queue.ALTERNATE_BINDING); + Map<String, Object> actualAlternateBindingMap = convertIfNecessary(actualAlternateBinding); + assertEquals("Unexpected alternate binding", + new HashMap<>(expectedAlternateBinding), + new HashMap<>(actualAlternateBindingMap)); + + assertNotNull("Cannot get dead letter queue", + managementReadObject(session, "org.apache.qpid.Queue", deadLetterQueueName, true)); + } + + public void testExchangeAlternateBindingCreation() throws Exception + { + Connection connection = getConnection(); + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + + String exchangeName = getTestQueueName(); + String deadLetterExchangeName = exchangeName + DEAD_LETTER_EXCHANGE_SUFFIX; + + final Map<String, Object> attributes = new HashMap<>(); + Map<String, Object> expectedAlternateBinding = + Collections.singletonMap(AlternateBinding.DESTINATION, deadLetterExchangeName); + attributes.put(Exchange.ALTERNATE_BINDING, new ObjectMapper().writeValueAsString(expectedAlternateBinding)); + attributes.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); + createEntityUsingAmqpManagement(exchangeName, + session, + "org.apache.qpid.DirectExchange", attributes); + + Map<String, Object> exchangeAttributes = + managementReadObject(session, "org.apache.qpid.Exchange", exchangeName, true); + + Object actualAlternateBinding = exchangeAttributes.get(Exchange.ALTERNATE_BINDING); + Map<String, Object> actualAlternateBindingMap = convertIfNecessary(actualAlternateBinding); + assertEquals("Unexpected alternate binding", + new HashMap<>(expectedAlternateBinding), + new HashMap<>(actualAlternateBindingMap)); + + assertNotNull("Cannot get dead letter exchange", + managementReadObject(session, "org.apache.qpid.FanoutExchange", deadLetterExchangeName, true)); + } + + public void testLegacyQueueDeclareArgumentAlternateBindingCreation() throws Exception + { + Connection connection = getConnection(); + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + + final Map<String, Object> arguments = Collections.singletonMap(QueueArgumentsConverter.X_QPID_DLQ_ENABLED, true); + String testQueueName = getTestQueueName(); + ((AMQSession<?,?>) session).createQueue(testQueueName, false, true, false, arguments); + + + Map<String, Object> queueAttributes = + managementReadObject(session, "org.apache.qpid.Queue", testQueueName, true); + + Object actualAlternateBinding = queueAttributes.get(Exchange.ALTERNATE_BINDING); + assertTrue("Unexpected alternate binding", actualAlternateBinding instanceof Map); + Object deadLetterQueueName = ((Map<String, Object>) actualAlternateBinding).get(AlternateBinding.DESTINATION); + + assertNotNull("Cannot get dead letter queue", + managementReadObject(session, "org.apache.qpid.Queue", String.valueOf(deadLetterQueueName), true)); + } + + private Map<String, Object> convertIfNecessary(final Object actualAlternateBinding) throws IOException + { + Map<String, Object> actualAlternateBindingMap; + if (actualAlternateBinding instanceof String) + { + actualAlternateBindingMap = new ObjectMapper().readValue((String)actualAlternateBinding, Map.class); + } + else + { + actualAlternateBindingMap = (Map<String, Object>) actualAlternateBinding; + } + return actualAlternateBindingMap; + } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/systests/src/test/java/org/apache/qpid/server/routing/AlternateBindingRoutingTest.java ---------------------------------------------------------------------- diff --git a/systests/src/test/java/org/apache/qpid/server/routing/AlternateBindingRoutingTest.java b/systests/src/test/java/org/apache/qpid/server/routing/AlternateBindingRoutingTest.java new file mode 100644 index 0000000..86ab13f --- /dev/null +++ b/systests/src/test/java/org/apache/qpid/server/routing/AlternateBindingRoutingTest.java @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.routing; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.Queue; +import javax.jms.Session; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.qpid.server.model.AlternateBinding; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class AlternateBindingRoutingTest extends QpidBrokerTestCase +{ + public void testFanoutExchangeAsAlternateBinding() throws Exception + { + Connection connection = getConnection(); + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + + String queueName = getTestQueueName(); + String deadLetterQueueName = queueName + "_DeadLetter"; + String deadLetterExchangeName = "deadLetterExchange"; + + Queue deadLetterQueue = createTestQueue(session, deadLetterQueueName); + + createEntityUsingAmqpManagement(deadLetterExchangeName, + session, + "org.apache.qpid.FanoutExchange"); + + final Map<String, Object> arguments = new HashMap<>(); + arguments.put("destination", deadLetterQueueName); + arguments.put("bindingKey", queueName); + performOperationUsingAmqpManagement(deadLetterExchangeName, + "bind", + session, + "org.apache.qpid.Exchange", + arguments); + + final Map<String, Object> attributes = new HashMap<>(); + attributes.put(org.apache.qpid.server.model.Queue.NAME, queueName); + attributes.put(org.apache.qpid.server.model.Queue.ALTERNATE_BINDING, + new ObjectMapper().writeValueAsString(Collections.singletonMap(AlternateBinding.DESTINATION, + deadLetterExchangeName))); + createEntityUsingAmqpManagement(queueName, + session, + "org.apache.qpid.StandardQueue", + attributes); + Queue testQueue = getQueueFromName(session, queueName); + + sendMessage(session, testQueue, 1); + assertEquals("Unexpected number of messages on queueName", 1, getQueueDepth(connection, testQueue)); + + assertEquals("Unexpected number of messages on DLQ queueName", 0, getQueueDepth(connection, deadLetterQueue)); + + performOperationUsingAmqpManagement(queueName, + "DELETE", + session, + "org.apache.qpid.Queue", + Collections.emptyMap()); + + assertEquals("Unexpected number of messages on DLQ queueName", 1, getQueueDepth(connection, deadLetterQueue)); + } + +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java ---------------------------------------------------------------------- diff --git a/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java b/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java index 5d1263f..d7d94e4 100644 --- a/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java +++ b/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java @@ -731,7 +731,7 @@ public class VirtualHostMessageStoreTest extends QpidTestCase attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, durable); attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, durable ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); - attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null); + attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_BINDING, null); exchange = _virtualHost.createChild(Exchange.class, attributes); return exchange; http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/systests/src/test/java/org/apache/qpid/systest/rest/ExchangeRestTest.java ---------------------------------------------------------------------- diff --git a/systests/src/test/java/org/apache/qpid/systest/rest/ExchangeRestTest.java b/systests/src/test/java/org/apache/qpid/systest/rest/ExchangeRestTest.java index 6b5099f..8b39eda 100644 --- a/systests/src/test/java/org/apache/qpid/systest/rest/ExchangeRestTest.java +++ b/systests/src/test/java/org/apache/qpid/systest/rest/ExchangeRestTest.java @@ -21,10 +21,12 @@ package org.apache.qpid.systest.rest; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.qpid.server.model.AlternateBinding; import org.apache.qpid.server.model.Exchange; public class ExchangeRestTest extends QpidRestTestCase @@ -74,24 +76,24 @@ public class ExchangeRestTest extends QpidRestTestCase String exchangeName = getTestName(); String exchangeUrl = "exchange/test/test/" + exchangeName; - Map<String, Object> attributes = new HashMap<String, Object>(); + Map<String, Object> attributes = new HashMap<>(); attributes.put(Exchange.NAME, exchangeName); attributes.put(Exchange.TYPE, "direct"); - int responseCode = getRestTestHelper().submitRequest(exchangeUrl, "PUT", attributes); - assertEquals("Exchange should be created", 201, responseCode); + getRestTestHelper().submitRequest(exchangeUrl, "PUT", attributes, 201); Map<String, Object> exchange = getRestTestHelper().getJsonAsSingletonList(exchangeUrl); assertNotNull("Exchange not found", exchange); - attributes = new HashMap<String, Object>(); + attributes = new HashMap<>(); attributes.put(Exchange.NAME, exchangeName); - attributes.put(Exchange.ALTERNATE_EXCHANGE, "amq.direct"); + attributes.put(Exchange.ALTERNATE_BINDING, + Collections.singletonMap(AlternateBinding.DESTINATION, "amq.direct")); - responseCode = getRestTestHelper().submitRequest(exchangeUrl, "PUT", attributes); - assertEquals("Exchange update should be supported", 200, responseCode); + getRestTestHelper().submitRequest(exchangeUrl, "PUT", attributes, 200); exchange = getRestTestHelper().getJsonAsSingletonList(exchangeUrl); assertNotNull("Exchange not found", exchange); - assertEquals("amq.direct",exchange.get(Exchange.ALTERNATE_EXCHANGE)); + assertEquals(new HashMap<>(Collections.singletonMap(AlternateBinding.DESTINATION, "amq.direct")), + new HashMap<>(((Map<String, Object>) exchange.get(Exchange.ALTERNATE_BINDING)))); } private void assertExchange(String exchangeName, Map<String, Object> exchange) http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java ---------------------------------------------------------------------- diff --git a/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java b/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java index 712eafc..52fa38c 100644 --- a/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java +++ b/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java @@ -43,7 +43,6 @@ import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.LastValueQueue; import org.apache.qpid.server.queue.PriorityQueue; import org.apache.qpid.server.queue.SortedQueue; -import org.apache.qpid.server.virtualhost.AbstractVirtualHost; import org.apache.qpid.server.virtualhost.NodeAutoCreationPolicy; import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost; import org.apache.qpid.server.virtualhost.derby.DerbyVirtualHostImpl; @@ -679,46 +678,6 @@ public class VirtualHostRestTest extends QpidRestTestCase assertEquals("Unexpected priorities key attribute", 10, priorityQueue.get(PriorityQueue.PRIORITIES)); } - @SuppressWarnings("unchecked") - public void testCreateQueueWithDLQEnabled() throws Exception - { - String queueName = getTestQueueName(); - - Map<String, Object> attributes = new HashMap<String, Object>(); - attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, true); - - //verify the starting state - Map<String, Object> hostDetails = getRestTestHelper().getJsonAsSingletonList("virtualhost/test"); - List<Map<String, Object>> queues = (List<Map<String, Object>>) hostDetails.get(VirtualHostRestTest.VIRTUALHOST_QUEUES_ATTRIBUTE); - List<Map<String, Object>> exchanges = (List<Map<String, Object>>) hostDetails.get(VirtualHostRestTest.VIRTUALHOST_EXCHANGES_ATTRIBUTE); - - assertNull("queue "+ queueName + " should not have already been present", getRestTestHelper().find(Queue.NAME, queueName , queues)); - assertNull("queue "+ queueName + "_DLQ should not have already been present", getRestTestHelper().find(Queue.NAME, queueName + "_DLQ" , queues)); - assertNull("exchange should not have already been present", getRestTestHelper().find(Exchange.NAME, queueName + "_DLE" , exchanges)); - - //create the queue - createQueue(queueName, "standard", attributes); - - //verify the new queue, as well as the DLQueue and DLExchange have been created - hostDetails = getRestTestHelper().getJsonAsSingletonList("virtualhost/test"); - queues = (List<Map<String, Object>>) hostDetails.get(VirtualHostRestTest.VIRTUALHOST_QUEUES_ATTRIBUTE); - exchanges = (List<Map<String, Object>>) hostDetails.get(VirtualHostRestTest.VIRTUALHOST_EXCHANGES_ATTRIBUTE); - - Map<String, Object> queue = getRestTestHelper().find(Queue.NAME, queueName , queues); - Map<String, Object> dlqQueue = getRestTestHelper().find(Queue.NAME, queueName + "_DLQ" , queues); - Map<String, Object> dlExchange = getRestTestHelper().find(Exchange.NAME, queueName + "_DLE" , exchanges); - assertNotNull("queue should have been present", queue); - assertNotNull("queue should have been present", dlqQueue); - assertNotNull("exchange should have been present", dlExchange); - - //verify that the alternate exchange is set as expected on the new queue - Map<String, Object> queueAttributes = new HashMap<String, Object>(); - queueAttributes.put(Queue.ALTERNATE_EXCHANGE, queueName + "_DLE"); - - Asserts.assertQueue(queueName, "standard", queue, queueAttributes); - Asserts.assertQueue(queueName, "standard", queue, null); - } - public void testObjectsWithSlashes() throws Exception { String queueName = "testQueue/with/slashes"; http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/systests/src/test/java/org/apache/qpid/systest/rest/acl/ExchangeRestACLTest.java ---------------------------------------------------------------------- diff --git a/systests/src/test/java/org/apache/qpid/systest/rest/acl/ExchangeRestACLTest.java b/systests/src/test/java/org/apache/qpid/systest/rest/acl/ExchangeRestACLTest.java index 15461d7..dc35c90 100644 --- a/systests/src/test/java/org/apache/qpid/systest/rest/acl/ExchangeRestACLTest.java +++ b/systests/src/test/java/org/apache/qpid/systest/rest/acl/ExchangeRestACLTest.java @@ -35,6 +35,7 @@ import javax.servlet.http.HttpServletResponse; import org.apache.qpid.server.management.plugin.servlet.rest.AbstractServlet; import org.apache.qpid.server.model.AccessControlProvider; +import org.apache.qpid.server.model.AlternateBinding; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; import org.apache.qpid.systest.rest.QpidRestTestCase; @@ -197,33 +198,33 @@ public class ExchangeRestACLTest extends QpidRestTestCase { getRestTestHelper().setUsernameAndPassword(ALLOWED_USER, ALLOWED_USER); - int responseCode = createExchange(); + createExchange(); assertExchangeExists(); Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(Exchange.NAME, _exchangeName); - attributes.put(Exchange.ALTERNATE_EXCHANGE, "my-alternate-exchange"); + attributes.put(Exchange.ALTERNATE_BINDING, + Collections.singletonMap(AlternateBinding.DESTINATION, "my-alternate-exchange")); - responseCode = getRestTestHelper().submitRequest(_exchangeUrl, "PUT", attributes); - assertEquals("Exchange 'my-alternate-exchange' does not exist", AbstractServlet.SC_UNPROCESSABLE_ENTITY, responseCode); + getRestTestHelper().submitRequest(_exchangeUrl, "PUT", attributes, AbstractServlet.SC_UNPROCESSABLE_ENTITY); } public void testSetExchangeAttributesDenied() throws Exception { getRestTestHelper().setUsernameAndPassword(ALLOWED_USER, ALLOWED_USER); - int responseCode = createExchange(); + createExchange(); assertExchangeExists(); getRestTestHelper().setUsernameAndPassword(DENIED_USER, DENIED_USER); Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(Exchange.NAME, _exchangeName); - attributes.put(Exchange.ALTERNATE_EXCHANGE, "my-alternate-exchange"); + attributes.put(Exchange.ALTERNATE_BINDING, + Collections.singletonMap(AlternateBinding.DESTINATION, "my-alternate-exchange")); - responseCode = getRestTestHelper().submitRequest(_exchangeUrl, "PUT", attributes); - assertEquals("Setting of exchange attribites should be allowed", 403, responseCode); + getRestTestHelper().submitRequest(_exchangeUrl, "PUT", attributes, 403); } public void testBindToExchangeAllowed() throws Exception http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java ---------------------------------------------------------------------- diff --git a/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java b/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java index ae6f7ce..c4271fc 100644 --- a/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java +++ b/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java @@ -22,6 +22,7 @@ package org.apache.qpid.test.unit.client; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -29,7 +30,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import javax.jms.Connection; -import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -38,14 +38,14 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; -import javax.jms.Topic; +import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.client.RejectBehaviour; import org.apache.qpid.configuration.ClientProperties; -import org.apache.qpid.server.virtualhost.AbstractVirtualHost; +import org.apache.qpid.server.model.AlternateBinding; import org.apache.qpid.test.utils.QpidBrokerTestCase; @@ -71,12 +71,14 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase private static final int MAX_DELIVERY_COUNT = 2; private CountDownLatch _awaitCompletion; - protected long _awaitEmptyQueue; - protected long _awaitCompletionTimeout = 20; + private long _awaitEmptyQueue; + private long _awaitCompletionTimeout = 20; /** index numbers of messages to be redelivered */ private final List<Integer> _redeliverMsgs = Arrays.asList(1, 2, 5, 14); private String _testQueueName; + private Queue _testDeadLetterQueue; + private Queue _testQueue; @Override public void setUp() throws Exception @@ -84,9 +86,6 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase _awaitEmptyQueue = Long.parseLong(System.getProperty("MaxDeliveryCountTest.awaitEmptyQueue", "2500")); _awaitCompletionTimeout = Long.parseLong(System.getProperty("MaxDeliveryCountTest.awaitCompletionTimeout", "20000")); - setTestSystemProperty("queue.deadLetterQueueEnabled","true"); - setTestSystemProperty("queue.maximumDeliveryAttempts", String.valueOf(MAX_DELIVERY_COUNT)); - // Set client-side flag to allow the server to determine if messages // dead-lettered or requeued. if (!isBroker010()) @@ -95,31 +94,28 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase } super.setUp(); _testQueueName = getTestQueueName(); - boolean durableSub = isDurSubTest(); + String testDeadLetterQueueName = _testQueueName + "_DLQ"; Connection connection = getConnectionBuilder().setClientId("clientid").build(); connection.start(); Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - Destination destination; - if(durableSub) - { - destination = createTopic(connection, _testQueueName); - session.createDurableSubscriber((Topic)destination, getName()).close(); - } - else - { - final Map<String, Object> attributes = new HashMap<>(); - attributes.put(org.apache.qpid.server.model.Queue.NAME, _testQueueName); - attributes.put(org.apache.qpid.server.model.Queue.MAXIMUM_DELIVERY_ATTEMPTS, MAX_DELIVERY_COUNT); - attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, true); - createEntityUsingAmqpManagement(_testQueueName, - session, - "org.apache.qpid.StandardQueue", - attributes); - destination = getQueueFromName(session, _testQueueName); - } - MessageProducer producer = session.createProducer(destination); + _testDeadLetterQueue = createTestQueue(session, testDeadLetterQueueName); + + final Map<String, Object> attributes = new HashMap<>(); + attributes.put(org.apache.qpid.server.model.Queue.NAME, _testQueueName); + attributes.put(org.apache.qpid.server.model.Queue.MAXIMUM_DELIVERY_ATTEMPTS, MAX_DELIVERY_COUNT); + attributes.put(org.apache.qpid.server.model.Queue.ALTERNATE_BINDING, + new ObjectMapper().writeValueAsString(Collections.singletonMap(AlternateBinding.DESTINATION, + testDeadLetterQueueName))); + createEntityUsingAmqpManagement(_testQueueName, + session, + "org.apache.qpid.StandardQueue", + attributes); + _testQueue = getQueueFromName(session, _testQueueName); + + + MessageProducer producer = session.createProducer(_testQueue); for (int count = 1; count <= MSG_COUNT; count++) { Message msg = session.createTextMessage(generateContent(count)); @@ -139,96 +135,57 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase return "Message " + count + " content."; } - /** - * Test that Max Redelivery is enforced when using onMessage() on a - * Client-Ack session. - */ public void testAsynchronousClientAckSession() throws Exception { - doTest(Session.CLIENT_ACKNOWLEDGE, _redeliverMsgs, false, false); + doTest(Session.CLIENT_ACKNOWLEDGE, _redeliverMsgs, false); } - /** - * Test that Max Redelivery is enforced when using onMessage() on a - * transacted session. - */ public void testAsynchronousTransactedSession() throws Exception { - doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, false, false); + doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, false); } - /** - * Test that Max Redelivery is enforced when using onMessage() on an - * Auto-Ack session. - */ public void testAsynchronousAutoAckSession() throws Exception { - doTest(Session.AUTO_ACKNOWLEDGE, _redeliverMsgs, false, false); + doTest(Session.AUTO_ACKNOWLEDGE, _redeliverMsgs, false); } - /** - * Test that Max Redelivery is enforced when using onMessage() on a - * Dups-OK session. - */ public void testAsynchronousDupsOkSession() throws Exception { - doTest(Session.DUPS_OK_ACKNOWLEDGE, _redeliverMsgs, false, false); + doTest(Session.DUPS_OK_ACKNOWLEDGE, _redeliverMsgs, false); } - /** - * Test that Max Redelivery is enforced when using recieve() on a - * Client-Ack session. - */ public void testSynchronousClientAckSession() throws Exception { - doTest(Session.CLIENT_ACKNOWLEDGE, _redeliverMsgs, true, false); + doTest(Session.CLIENT_ACKNOWLEDGE, _redeliverMsgs, true); } - /** - * Test that Max Redelivery is enforced when using recieve() on a - * transacted session. - */ public void testSynchronousTransactedSession() throws Exception { - doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, true, false); - } - - public void testDurableSubscription() throws Exception - { - doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, false, true); + doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, true); } public void testWhenBrokerIsRestartedAfterEnqeuingMessages() throws Exception { restartDefaultBroker(); - doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, true, false); + doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, true); } - private void doTest(final int deliveryMode, final List<Integer> redeliverMsgs, final boolean synchronous, final boolean durableSub) throws Exception + private void doTest(final int deliveryMode, + final List<Integer> redeliverMsgs, + final boolean synchronous) throws Exception { final Connection clientConnection = getConnectionBuilder().setClientId("clientid").build(); final boolean transacted = deliveryMode == Session.SESSION_TRANSACTED; final Session clientSession = clientConnection.createSession(transacted, deliveryMode); - MessageConsumer consumer; - Destination dest = durableSub ? clientSession.createTopic(_testQueueName) : clientSession.createQueue(_testQueueName); - Queue checkQueue; - if(durableSub) - { - consumer = clientSession.createDurableSubscriber((Topic)dest, getName()); + MessageConsumer consumer = clientSession.createConsumer(_testQueue); - checkQueue = clientSession.createQueue(getDurableSubscriptionQueueName()); - } - else - { - consumer = clientSession.createConsumer(dest); - checkQueue = (Queue) dest; - } clientConnection.start(); assertEquals("The queue should have " + MSG_COUNT + " msgs at start", - MSG_COUNT, getQueueDepth(clientConnection, checkQueue)); + MSG_COUNT, getQueueDepth(clientConnection, _testQueue)); int expectedDeliveries = MSG_COUNT + ((MAX_DELIVERY_COUNT -1) * redeliverMsgs.size()); @@ -269,17 +226,17 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase && clientSession.getAcknowledgeMode() != Session.CLIENT_ACKNOWLEDGE) { final long timeout = System.currentTimeMillis() + _awaitEmptyQueue; - while(getQueueDepth(clientConnection, checkQueue) > 0 && System.currentTimeMillis() < timeout) + while(getQueueDepth(clientConnection, _testQueue) > 0 && System.currentTimeMillis() < timeout) { Thread.sleep(100); } } //check the source queue is now empty - assertEquals("The queue should have 0 msgs left", 0, getQueueDepth(clientConnection, checkQueue)); + assertEquals("The queue should have 0 msgs left", 0, getQueueDepth(clientConnection, _testQueue)); //check the DLQ has the required number of rejected-without-requeue messages - verifyDLQdepth(redeliverMsgs.size(), clientSession, durableSub, clientConnection); + verifyDLQdepth(redeliverMsgs.size(), clientConnection); if (!isBroker10()) { @@ -294,62 +251,33 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase clientConnection2.start(); //verify the messages on the DLQ - verifyDLQcontent(clientConnection2, redeliverMsgs, getTestQueueName(), durableSub); + verifyDLQcontent(clientConnection2, redeliverMsgs); clientConnection2.close(); } else { //verify the messages on the DLQ - verifyDLQcontent(clientConnection, redeliverMsgs, getTestQueueName(), durableSub); + verifyDLQcontent(clientConnection, redeliverMsgs); clientConnection.close(); } } } - private String getDurableSubscriptionQueueName() + private void verifyDLQdepth(int expectedQueueDepth, final Connection clientConnection) throws Exception { - if ( isBroker10()) - { - return "qpidsub_/clientid_/" + getName() + "_/durable"; - } - else - { - return "clientid:" + getName(); - } - } - - private void verifyDLQdepth(int expected, - Session clientSession, - boolean durableSub, - final Connection clientConnection) throws Exception - { - String queueName = (durableSub ? getDurableSubscriptionQueueName() : _testQueueName ) - + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX; - - assertEquals("The DLQ should have " + expected + " msgs on it", - expected, - getQueueDepth(clientConnection, clientSession.createQueue(queueName))); + assertEquals("The DLQ should have " + expectedQueueDepth + " msgs on it", + expectedQueueDepth, + getQueueDepth(clientConnection, _testDeadLetterQueue)); } - private void verifyDLQcontent(Connection clientConnection, List<Integer> redeliverMsgs, String destName, boolean durableSub) throws JMSException + private void verifyDLQcontent(Connection clientConnection, List<Integer> redeliverMsgs) throws JMSException { Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer; - if(durableSub) - { - String queueName = (durableSub ? getDurableSubscriptionQueueName() : _testQueueName ) - + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX; - consumer = clientSession.createConsumer(clientSession.createQueue(queueName)); - } - else - { - consumer = clientSession.createConsumer( - clientSession.createQueue(destName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX)); - } + MessageConsumer consumer = clientSession.createConsumer(_testDeadLetterQueue); //keep track of the message we expect to still be on the DLQ List<Integer> outstandingMessages = new ArrayList<>(redeliverMsgs); @@ -667,9 +595,4 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase } } } - - private boolean isDurSubTest() - { - return getTestQueueName().contains("DurableSubscription"); - } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/test-profiles/Java10Excludes ---------------------------------------------------------------------- diff --git a/test-profiles/Java10Excludes b/test-profiles/Java10Excludes index b623a83..f8b4416 100644 --- a/test-profiles/Java10Excludes +++ b/test-profiles/Java10Excludes @@ -128,6 +128,7 @@ org.apache.qpid.test.unit.client.connection.ExceptionListenerTest#testExceptionL // These tests specifically test BURL behaviour org.apache.qpid.server.queue.NodeAutoCreationPolicyTest#testSendingToQueuePatternBURL org.apache.qpid.server.queue.NodeAutoCreationPolicyTest#testSendingToNonMatchingQueuePatternBURL +org.apache.qpid.server.queue.NodeAutoCreationPolicyTest#testLegacyQueueDeclareArgumentAlternateBindingCreation // Message encryption not currently supported by the 1.0 client org.apache.qpid.systest.messageencryption.MessageEncryptionTest#* @@ -204,7 +205,7 @@ org.apache.qpid.server.logging.DurableQueueLoggingTest#* org.apache.qpid.server.logging.QueueLoggingTest#* org.apache.qpid.server.logging.TransientQueueLoggingTest#* -// Tests verify the 0-x client's behaviour on recover which is not applicable to new client +// Tests call Session#recover() to redeliver messages from broker which is not applicable to new client org.apache.qpid.test.unit.client.MaxDeliveryCountTest#testSynchronousClientAckSession org.apache.qpid.test.unit.client.MaxDeliveryCountTest#testAsynchronousClientAckSession org.apache.qpid.test.unit.client.MaxDeliveryCountTest#testAsynchronousDupsOkSession --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
