Repository: qpid-broker-j Updated Branches: refs/heads/master c45aea4c4 -> 4dcbb741b
QPID-8164: Add tests ensuring that sending links to temporary queues/topics from other connections are allowed. Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/4dcbb741 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/4dcbb741 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/4dcbb741 Branch: refs/heads/master Commit: 4dcbb741b4e89f109777777154b4f74632f76683 Parents: c45aea4 Author: Keith Wall <[email protected]> Authored: Mon Apr 23 09:23:38 2018 +0100 Committer: Keith Wall <[email protected]> Committed: Mon Apr 23 09:23:59 2018 +0100 ---------------------------------------------------------------------- .../bindmapjms/TemporaryDestinationTest.java | 160 ++++++++++++++----- 1 file changed, 119 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4dcbb741/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java index 4160336..1dd63ea 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java @@ -20,7 +20,6 @@ package org.apache.qpid.tests.protocol.v1_0.extensions.bindmapjms; -import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; @@ -36,7 +35,6 @@ import org.junit.Test; import org.apache.qpid.server.protocol.v1_0.Session_1_0; import org.apache.qpid.server.protocol.v1_0.type.Symbol; import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; -import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted; import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnClose; import org.apache.qpid.server.protocol.v1_0.type.messaging.Source; import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; @@ -45,7 +43,6 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; import org.apache.qpid.server.protocol.v1_0.type.transport.Begin; import org.apache.qpid.server.protocol.v1_0.type.transport.Detach; -import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition; import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; import org.apache.qpid.server.protocol.v1_0.type.transport.Open; import org.apache.qpid.server.protocol.v1_0.type.transport.Role; @@ -58,6 +55,8 @@ import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; public class TemporaryDestinationTest extends BrokerAdminUsingTestBase { + private static final Symbol TEMPORARY_QUEUE = Symbol.valueOf("temporary-queue"); + private static final Symbol TEMPORARY_TOPIC = Symbol.valueOf("temporary-topic"); private InetSocketAddress _brokerAddress; @Before @@ -73,7 +72,7 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase + "dynamic-node-properties field of target containing the âlifetime-policyâ symbol key mapped to delete-on-close.") public void deleteOnCloseWithConnectionCloseForQueue() throws Exception { - deleteOnCloseWithConnectionClose(new Symbol[]{Symbol.valueOf("temporary-queue")}); + deleteOnCloseWithConnectionClose(new Symbol[]{TEMPORARY_QUEUE}); } @Test @@ -83,12 +82,12 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase + "dynamic-node-properties field of target containing the âlifetime-policyâ symbol key mapped to delete-on-close.") public void deleteOnCloseWithConnectionCloseForTopic() throws Exception { - deleteOnCloseWithConnectionClose(new Symbol[]{Symbol.valueOf("temporary-topic")}); + deleteOnCloseWithConnectionClose(new Symbol[]{TEMPORARY_TOPIC}); } private void deleteOnCloseWithConnectionClose(final Symbol[] targetCapabilities) throws Exception { - String newTemporaryNodeAddress = null; + String newTemporaryNodeAddress; try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { @@ -136,9 +135,9 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase + " for the application by supplying a terminus capability for the particular Destination" + " type to which the client expects to attach [...]" + "TemporaryQueue Terminus capability : 'temporary-queue'") - public void canConsumeFormTemporaryQueueCreatedOnTheSameConnection() throws Exception + public void createTemporaryQueueReceivingLink() throws Exception { - final Symbol[] capabilities = new Symbol[]{Symbol.valueOf("temporary-queue")}; + final Symbol[] capabilities = new Symbol[]{TEMPORARY_QUEUE}; try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { Target target = createTarget(capabilities); @@ -196,9 +195,9 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase + " for the application by supplying a terminus capability for the particular Destination" + " type to which the client expects to attach [...]" + "TemporaryQueue Terminus capability : 'temporary-queue'") - public void canNotConsumeFormTemporaryQueueCreatedOnOtherConnection() throws Exception + public void createTemporaryQueueReceivingLinkFromOtherConnectionDisallowed() throws Exception { - final Symbol[] capabilities = new Symbol[]{Symbol.valueOf("temporary-queue")}; + final Symbol[] capabilities = new Symbol[]{TEMPORARY_QUEUE}; try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { Target target = createTarget(capabilities); @@ -222,7 +221,46 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase interaction.consumeResponse().getLatestResponse(Flow.class); - tryToConsume(createSource(newTemporaryNodeAddress, capabilities)); + assertReceivingLinkFails(createSource(newTemporaryNodeAddress, capabilities), AmqpError.RESOURCE_LOCKED); + + interaction.doCloseConnection(); + } + } + + @Test + @SpecificationTest(section = "N/A", + description = "JMS 2.0." + + " 6.2.2. Creating temporary destinations" + + "Temporary destinations ( TemporaryQueue or TemporaryTopic objects) are destinations" + + " that are system - generated uniquely for their connection. Only their own connection" + + " is allowed to create consumer objects for them.") + public void createTemporaryQueueSendingLinkFromOtherConnectionAllowed() throws Exception + { + final Symbol[] capabilities = new Symbol[]{TEMPORARY_QUEUE}; + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + Target target = createTarget(capabilities); + + final Interaction interaction = transport.newInteraction(); + final UnsignedInteger senderHandle = UnsignedInteger.ONE; + final Attach senderAttachResponse = interaction.negotiateProtocol().consumeResponse() + .open().consumeResponse(Open.class) + .begin().consumeResponse(Begin.class) + .attachRole(Role.SENDER) + .attachHandle(senderHandle) + .attachTarget(target) + .attach().consumeResponse() + .getLatestResponse(Attach.class); + + assertThat(senderAttachResponse.getSource(), is(notNullValue())); + assertThat(senderAttachResponse.getTarget(), is(notNullValue())); + + String newTemporaryNodeAddress = ((Target) senderAttachResponse.getTarget()).getAddress(); + assertThat(newTemporaryNodeAddress, is(notNullValue())); + + interaction.consumeResponse().getLatestResponse(Flow.class); + + assertSendingLinkSucceeds(newTemporaryNodeAddress); interaction.doCloseConnection(); } @@ -247,9 +285,9 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase + " for the application by supplying a terminus capability for the particular Destination" + " type to which the client expects to attach" + "TemporaryTopic Terminus capability : 'temporary-topic'") - public void canConsumeFormTemporaryTopicCreatedOnTheSameConnection() throws Exception + public void createTemporaryTopicSubscriptionReceivingLink() throws Exception { - final Symbol[] capabilities = new Symbol[]{Symbol.valueOf("temporary-topic")}; + final Symbol[] capabilities = new Symbol[]{TEMPORARY_TOPIC}; try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { final Source source = new Source(); @@ -270,7 +308,6 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase .getLatestResponse(Attach.class); assertThat(receiverAttachResponse.getSource(), is(notNullValue())); - assertThat(receiverAttachResponse.getSource(), is(notNullValue())); String newTemporaryNodeAddress = ((Source) receiverAttachResponse.getSource()).getAddress(); assertThat(newTemporaryNodeAddress, is(notNullValue())); @@ -289,28 +326,6 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase .consumeResponse(Attach.class) .consumeResponse(Flow.class); - String testData = "testData"; - Disposition responseDisposition = interaction.transferPayloadData(testData) - .transferHandle(senderHandle) - .transfer() - .consumeResponse() - .getLatestResponse(Disposition.class); - assertThat(responseDisposition.getRole(), is(Role.RECEIVER)); - assertThat(responseDisposition.getSettled(), is(Boolean.TRUE)); - assertThat(responseDisposition.getState(), is(instanceOf(Accepted.class))); - - interaction.flowIncomingWindow(UnsignedInteger.ONE) - .flowNextIncomingId(UnsignedInteger.ZERO) - .flowOutgoingWindow(UnsignedInteger.ZERO) - .flowNextOutgoingId(UnsignedInteger.ZERO) - .flowLinkCredit(UnsignedInteger.ONE) - .flowHandle(receiverHandle) - .flow() - .receiveDelivery() - .decodeLatestDelivery(); - - assertThat(interaction.getDecodedLatestDelivery(), is(equalTo(testData))); - interaction.doCloseConnection(); } } @@ -334,9 +349,9 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase + " for the application by supplying a terminus capability for the particular Destination" + " type to which the client expects to attach" + "TemporaryTopic Terminus capability : 'temporary-topic'") - public void canNotConsumeFormTemporaryTopicCreatedOnOtherConnection() throws Exception + public void createTemporaryTopicSubscriptionReceivingLinkFromOtherConnectionDisallowed() throws Exception { - final Symbol[] capabilities = new Symbol[]{Symbol.valueOf("temporary-topic")}; + final Symbol[] capabilities = new Symbol[]{TEMPORARY_TOPIC}; try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { final Source source = new Source(); @@ -362,13 +377,53 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase String newTemporaryNodeAddress = ((Source) receiverAttachResponse.getSource()).getAddress(); assertThat(newTemporaryNodeAddress, is(notNullValue())); - tryToConsume(createSource(newTemporaryNodeAddress, capabilities)); + assertReceivingLinkFails(createSource(newTemporaryNodeAddress, capabilities), AmqpError.RESOURCE_LOCKED); interaction.doCloseConnection(); } } - private void tryToConsume(final Source source) throws Exception + @Test + @SpecificationTest(section = "N/A", + description = "JMS 2.0." + + " 6.2.2. Creating temporary destinations" + + "Temporary destinations ( TemporaryQueue or TemporaryTopic objects) are destinations" + + " that are system - generated uniquely for their connection. Only their own connection" + + " is allowed to create consumer objects for them.") + public void createTemporaryTopicSendingLinkFromOtherConnectionAllowed() throws Exception + { + final Symbol[] capabilities = new Symbol[]{TEMPORARY_TOPIC}; + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Source source = new Source(); + source.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose())); + source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); + source.setCapabilities(capabilities); + source.setDynamic(true); + + final Interaction interaction = transport.newInteraction(); + final UnsignedInteger receiverHandle = UnsignedInteger.ONE; + final Attach receiverAttachResponse = interaction.negotiateProtocol().consumeResponse() + .open().consumeResponse(Open.class) + .begin().consumeResponse(Begin.class) + .attachRole(Role.RECEIVER) + .attachSource(source) + .attachHandle(receiverHandle) + .attach().consumeResponse() + .getLatestResponse(Attach.class); + + assertThat(receiverAttachResponse.getSource(), is(notNullValue())); + + String newTemporaryNodeAddress = ((Source) receiverAttachResponse.getSource()).getAddress(); + assertThat(newTemporaryNodeAddress, is(notNullValue())); + + assertSendingLinkSucceeds(newTemporaryNodeAddress); + + interaction.doCloseConnection(); + } + } + + private void assertReceivingLinkFails(final Source source, final AmqpError expectedError) throws Exception { try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { @@ -387,7 +442,30 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase .getLatestResponse(Detach.class); assertThat(responseDetach.getClosed(), is(true)); assertThat(responseDetach.getError(), is(Matchers.notNullValue())); - assertThat(responseDetach.getError().getCondition(), is(equalTo(AmqpError.RESOURCE_LOCKED))); + assertThat(responseDetach.getError().getCondition(), is(equalTo(expectedError))); + interaction.doCloseConnection(); + } + } + + private void assertSendingLinkSucceeds(final String address) throws Exception + { + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + Target target = new Target(); + target.setAddress(address); + + final Interaction interaction = transport.newInteraction(); + interaction.negotiateProtocol() + .consumeResponse() + .open() + .consumeResponse(Open.class) + .begin() + .consumeResponse(Begin.class) + .attachRole(Role.SENDER) + .attachTarget(target) + .attach() + .consumeResponse(Attach.class) + .consumeResponse(Flow.class); interaction.doCloseConnection(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
