Repository: qpid-broker-j Updated Branches: refs/heads/master dbd42eaf5 -> c45aea4c4
QPID-8164: Make sure that only own connection consumers can consume from JMS temporary destinations Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/c45aea4c Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/c45aea4c Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/c45aea4c Branch: refs/heads/master Commit: c45aea4c41a7c389c58ede39e8cb8913b25cfab2 Parents: dbd42ea Author: Alex Rudyy <[email protected]> Authored: Wed Apr 18 22:14:11 2018 +0100 Committer: Alex Rudyy <[email protected]> Committed: Thu Apr 19 17:45:49 2018 +0100 ---------------------------------------------------------------------- .../qpid/server/protocol/v1_0/Session_1_0.java | 21 +- .../v1_0/StandardReceivingLinkEndpoint.java | 4 + .../bindmapjms/TemporaryDestinationTest.java | 318 ++++++++++++++++++- .../jms_1_1/queue/TemporaryQueueTest.java | 35 ++ 4 files changed, 366 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c45aea4c/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index ec875e1..cd8eb83 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -28,6 +28,7 @@ import java.security.AccessController; import java.security.PrivilegedAction; import java.text.MessageFormat; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; @@ -64,6 +65,7 @@ import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.DestinationAddress; import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.NamedAddressSpace; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.v1_0.delivery.DeliveryRegistry; @@ -715,7 +717,10 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget if (Boolean.TRUE.equals(source.getDynamic())) { - MessageSource tempSource = createDynamicSource(link, source.getDynamicNodeProperties()); + final Set<Symbol> sourceCapabilities = source.getCapabilities() == null + ? Collections.emptySet() + : new HashSet<>(Arrays.asList(source.getCapabilities())); + MessageSource tempSource = createDynamicSource(link, source.getDynamicNodeProperties(), sourceCapabilities); if(tempSource != null) { source.setAddress(_primaryDomain + tempSource.getName()); @@ -789,7 +794,9 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget return exchangeDestination; } - private MessageSource createDynamicSource(final Link_1_0<?, ?> link, Map properties) throws AmqpErrorException + private MessageSource createDynamicSource(final Link_1_0<?, ?> link, + Map properties, + final Set<Symbol> capabilities) throws AmqpErrorException { // TODO temporary topics? final String queueName = "TempQueue" + UUID.randomUUID().toString(); @@ -797,6 +804,12 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget { Map<String, Object> attributes = convertDynamicNodePropertiesToAttributes(link, properties, queueName); + if (capabilities.contains(Symbol.valueOf("temporary-queue")) + || capabilities.contains(Symbol.valueOf("temporary-topic"))) + { + attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONNECTION); + } + return Subject.doAs(getSubjectWithAddedSystemRights(), (PrivilegedAction<MessageSource>) () -> getAddressSpace().createMessageSource(MessageSource.class, attributes)); } @@ -829,6 +842,10 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget { attributes.put(Exchange.TYPE, ExchangeDefaults.FANOUT_EXCHANGE_CLASS); } + else if (capabilitySet.contains(Symbol.valueOf("temporary-queue"))) + { + attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONNECTION); + } return Subject.doAs(getSubjectWithAddedSystemRights(), (PrivilegedAction<MessageDestination>) () -> getAddressSpace().createMessageDestination(clazz, attributes)); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c45aea4c/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java index 91ab75b..cc689b9 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java @@ -439,6 +439,10 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint { targetCapabilities.add(Symbol.valueOf("temporary-topic")); } + if (desiredCapabilities.contains(Symbol.valueOf("temporary-queue"))) + { + targetCapabilities.add(Symbol.valueOf("temporary-queue")); + } if (desiredCapabilities.contains(Symbol.valueOf("topic"))) { targetCapabilities.add(Symbol.valueOf("topic")); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c45aea4c/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java index a3270a0..4160336 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java @@ -20,32 +20,41 @@ package org.apache.qpid.tests.protocol.v1_0.extensions.bindmapjms; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import java.net.InetSocketAddress; import java.util.Collections; +import org.hamcrest.Matchers; import org.junit.Before; import org.junit.Test; import org.apache.qpid.server.protocol.v1_0.Session_1_0; import org.apache.qpid.server.protocol.v1_0.type.Symbol; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted; import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnClose; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Source; import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy; +import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; import org.apache.qpid.server.protocol.v1_0.type.transport.Begin; +import org.apache.qpid.server.protocol.v1_0.type.transport.Detach; +import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition; import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; import org.apache.qpid.server.protocol.v1_0.type.transport.Open; import org.apache.qpid.server.protocol.v1_0.type.transport.Role; -import org.apache.qpid.tests.utils.BrokerAdmin; +import org.apache.qpid.tests.protocol.SpecificationTest; import org.apache.qpid.tests.protocol.v1_0.FrameTransport; import org.apache.qpid.tests.protocol.v1_0.Interaction; -import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; -import org.apache.qpid.tests.protocol.SpecificationTest; import org.apache.qpid.tests.protocol.v1_0.Utils; +import org.apache.qpid.tests.utils.BrokerAdmin; +import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; public class TemporaryDestinationTest extends BrokerAdminUsingTestBase { @@ -83,11 +92,7 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { - Target target = new Target(); - target.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose())); - target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); - target.setDynamic(true); - target.setCapabilities(targetCapabilities); + Target target = createTarget(targetCapabilities); final Interaction interaction = transport.newInteraction(); final Attach attachResponse = interaction.negotiateProtocol().consumeResponse() @@ -104,8 +109,6 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase newTemporaryNodeAddress = ((Target) attachResponse.getTarget()).getAddress(); assertThat(newTemporaryNodeAddress, is(notNullValue())); - assertThat(Utils.doesNodeExist(_brokerAddress, newTemporaryNodeAddress), is(true)); - interaction.consumeResponse().getLatestResponse(Flow.class); interaction.doCloseConnection(); @@ -113,4 +116,299 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase assertThat(Utils.doesNodeExist(_brokerAddress, newTemporaryNodeAddress), is(false)); } + + + @Test + @SpecificationTest(section = "N/A", + description = "JMS 2.0." + + " 6.2.2. Creating temporary destinations" + + "Temporary destinations ( TemporaryQueue or TemporaryTopic objects) are destinations" + + " that are system - generated uniquely for their connection. Only their own connection" + + " is allowed to create consumer objects for them." + + "" + + "4.1.5. TemporaryQueue" + + "A TemporaryQueue is a unique Queue object created for the duration of a connection." + + " It is a system-defined queue that can only be consumed by the connection that created it." + + "" + + "AMQP JMS Mapping." + + " 5.2. Destinations And Producers/Consumers" + + "[...] type information SHOULD be conveyed when creating producer or consumer links" + + " for the application by supplying a terminus capability for the particular Destination" + + " type to which the client expects to attach [...]" + + "TemporaryQueue Terminus capability : 'temporary-queue'") + public void canConsumeFormTemporaryQueueCreatedOnTheSameConnection() throws Exception + { + final Symbol[] capabilities = new Symbol[]{Symbol.valueOf("temporary-queue")}; + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + Target target = createTarget(capabilities); + + final Interaction interaction = transport.newInteraction(); + final UnsignedInteger senderHandle = UnsignedInteger.ONE; + final Attach senderAttachResponse = interaction.negotiateProtocol().consumeResponse() + .open().consumeResponse(Open.class) + .begin().consumeResponse(Begin.class) + .attachRole(Role.SENDER) + .attachHandle(senderHandle) + .attachTarget(target) + .attach().consumeResponse() + .getLatestResponse(Attach.class); + + assertThat(senderAttachResponse.getSource(), is(notNullValue())); + assertThat(senderAttachResponse.getTarget(), is(notNullValue())); + + String newTemporaryNodeAddress = ((Target) senderAttachResponse.getTarget()).getAddress(); + assertThat(newTemporaryNodeAddress, is(notNullValue())); + + interaction.consumeResponse().getLatestResponse(Flow.class); + + final Attach receiverAttachResponse = interaction.attachRole(Role.RECEIVER) + .attachSource(createSource(newTemporaryNodeAddress, + capabilities)) + .attachHandle(UnsignedInteger.valueOf(2)) + .attach().consumeResponse() + .getLatestResponse(Attach.class); + + assertThat(receiverAttachResponse.getSource(), is(notNullValue())); + assertThat(receiverAttachResponse.getSource(), is(notNullValue())); + assertThat(((Source) receiverAttachResponse.getSource()).getAddress(), + is(equalTo(newTemporaryNodeAddress))); + + interaction.doCloseConnection(); + } + } + + @Test + @SpecificationTest(section = "N/A", + description = "JMS 2.0." + + " 6.2.2. Creating temporary destinations" + + "Temporary destinations ( TemporaryQueue or TemporaryTopic objects) are destinations" + + " that are system - generated uniquely for their connection. Only their own connection" + + " is allowed to create consumer objects for them." + + "" + + "4.1.5. TemporaryQueue" + + "A TemporaryQueue is a unique Queue object created for the duration of a connection." + + " It is a system-defined queue that can only be consumed by the connection that created it." + + "" + + "AMQP JMS Mapping." + + " 5.2. Destinations And Producers/Consumers" + + "[...] type information SHOULD be conveyed when creating producer or consumer links" + + " for the application by supplying a terminus capability for the particular Destination" + + " type to which the client expects to attach [...]" + + "TemporaryQueue Terminus capability : 'temporary-queue'") + public void canNotConsumeFormTemporaryQueueCreatedOnOtherConnection() throws Exception + { + final Symbol[] capabilities = new Symbol[]{Symbol.valueOf("temporary-queue")}; + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + Target target = createTarget(capabilities); + + final Interaction interaction = transport.newInteraction(); + final UnsignedInteger senderHandle = UnsignedInteger.ONE; + final Attach senderAttachResponse = interaction.negotiateProtocol().consumeResponse() + .open().consumeResponse(Open.class) + .begin().consumeResponse(Begin.class) + .attachRole(Role.SENDER) + .attachHandle(senderHandle) + .attachTarget(target) + .attach().consumeResponse() + .getLatestResponse(Attach.class); + + assertThat(senderAttachResponse.getSource(), is(notNullValue())); + assertThat(senderAttachResponse.getTarget(), is(notNullValue())); + + String newTemporaryNodeAddress = ((Target) senderAttachResponse.getTarget()).getAddress(); + assertThat(newTemporaryNodeAddress, is(notNullValue())); + + interaction.consumeResponse().getLatestResponse(Flow.class); + + tryToConsume(createSource(newTemporaryNodeAddress, capabilities)); + + interaction.doCloseConnection(); + } + } + + @Test + @SpecificationTest(section = "N/A", + description = "JMS 2.0." + + " 6.2.2. Creating temporary destinations" + + "Temporary destinations ( TemporaryQueue or TemporaryTopic objects) are destinations" + + " that are system - generated uniquely for their connection. Only their own connection" + + " is allowed to create consumer objects for them." + + "" + + "4.2.7. Temporary topics" + + "A TemporaryTopic is a unique Topic object created for the duration of a JMSContext," + + " Connection or TopicConnection . It is a system defined Topic whose messages may be" + + " consumed only by the connection that created it." + + "" + + "AMQP JMS Mapping." + + " 5.2. Destinations And Producers/Consumers" + + "[...] type information SHOULD be conveyed when creating producer or consumer links" + + " for the application by supplying a terminus capability for the particular Destination" + + " type to which the client expects to attach" + + "TemporaryTopic Terminus capability : 'temporary-topic'") + public void canConsumeFormTemporaryTopicCreatedOnTheSameConnection() throws Exception + { + final Symbol[] capabilities = new Symbol[]{Symbol.valueOf("temporary-topic")}; + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Source source = new Source(); + source.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose())); + source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); + source.setCapabilities(capabilities); + source.setDynamic(true); + + final Interaction interaction = transport.newInteraction(); + final UnsignedInteger receiverHandle = UnsignedInteger.ONE; + final Attach receiverAttachResponse = interaction.negotiateProtocol().consumeResponse() + .open().consumeResponse(Open.class) + .begin().consumeResponse(Begin.class) + .attachRole(Role.RECEIVER) + .attachSource(source) + .attachHandle(receiverHandle) + .attach().consumeResponse() + .getLatestResponse(Attach.class); + + assertThat(receiverAttachResponse.getSource(), is(notNullValue())); + assertThat(receiverAttachResponse.getSource(), is(notNullValue())); + + String newTemporaryNodeAddress = ((Source) receiverAttachResponse.getSource()).getAddress(); + assertThat(newTemporaryNodeAddress, is(notNullValue())); + + Target target = new Target(); + target.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose())); + target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); + target.setCapabilities(capabilities); + target.setAddress(newTemporaryNodeAddress); + + final UnsignedInteger senderHandle = UnsignedInteger.valueOf(2); + interaction.attachRole(Role.SENDER) + .attachHandle(senderHandle) + .attachTarget(target) + .attach() + .consumeResponse(Attach.class) + .consumeResponse(Flow.class); + + String testData = "testData"; + Disposition responseDisposition = interaction.transferPayloadData(testData) + .transferHandle(senderHandle) + .transfer() + .consumeResponse() + .getLatestResponse(Disposition.class); + assertThat(responseDisposition.getRole(), is(Role.RECEIVER)); + assertThat(responseDisposition.getSettled(), is(Boolean.TRUE)); + assertThat(responseDisposition.getState(), is(instanceOf(Accepted.class))); + + interaction.flowIncomingWindow(UnsignedInteger.ONE) + .flowNextIncomingId(UnsignedInteger.ZERO) + .flowOutgoingWindow(UnsignedInteger.ZERO) + .flowNextOutgoingId(UnsignedInteger.ZERO) + .flowLinkCredit(UnsignedInteger.ONE) + .flowHandle(receiverHandle) + .flow() + .receiveDelivery() + .decodeLatestDelivery(); + + assertThat(interaction.getDecodedLatestDelivery(), is(equalTo(testData))); + + interaction.doCloseConnection(); + } + } + + @Test + @SpecificationTest(section = "N/A", + description = "JMS 2.0." + + " 6.2.2. Creating temporary destinations" + + "Temporary destinations ( TemporaryQueue or TemporaryTopic objects) are destinations" + + " that are system - generated uniquely for their connection. Only their own connection" + + " is allowed to create consumer objects for them." + + "" + + "4.2.7. Temporary topics" + + "A TemporaryTopic is a unique Topic object created for the duration of a JMSContext," + + " Connection or TopicConnection . It is a system defined Topic whose messages may be" + + " consumed only by the connection that created it." + + "" + + "AMQP JMS Mapping." + + " 5.2. Destinations And Producers/Consumers" + + "[...] type information SHOULD be conveyed when creating producer or consumer links" + + " for the application by supplying a terminus capability for the particular Destination" + + " type to which the client expects to attach" + + "TemporaryTopic Terminus capability : 'temporary-topic'") + public void canNotConsumeFormTemporaryTopicCreatedOnOtherConnection() throws Exception + { + final Symbol[] capabilities = new Symbol[]{Symbol.valueOf("temporary-topic")}; + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Source source = new Source(); + source.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose())); + source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); + source.setCapabilities(capabilities); + source.setDynamic(true); + + final Interaction interaction = transport.newInteraction(); + final UnsignedInteger receiverHandle = UnsignedInteger.ONE; + final Attach receiverAttachResponse = interaction.negotiateProtocol().consumeResponse() + .open().consumeResponse(Open.class) + .begin().consumeResponse(Begin.class) + .attachRole(Role.RECEIVER) + .attachSource(source) + .attachHandle(receiverHandle) + .attach().consumeResponse() + .getLatestResponse(Attach.class); + + assertThat(receiverAttachResponse.getSource(), is(notNullValue())); + assertThat(receiverAttachResponse.getSource(), is(notNullValue())); + + String newTemporaryNodeAddress = ((Source) receiverAttachResponse.getSource()).getAddress(); + assertThat(newTemporaryNodeAddress, is(notNullValue())); + + tryToConsume(createSource(newTemporaryNodeAddress, capabilities)); + + interaction.doCloseConnection(); + } + } + + private void tryToConsume(final Source source) throws Exception + { + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + final Detach responseDetach = interaction.negotiateProtocol() + .consumeResponse() + .open() + .consumeResponse(Open.class) + .begin() + .consumeResponse(Begin.class) + .attachRole(Role.RECEIVER) + .attachSource(source) + .attach() + .consumeResponse(Attach.class) + .consumeResponse(Detach.class) + .getLatestResponse(Detach.class); + assertThat(responseDetach.getClosed(), is(true)); + assertThat(responseDetach.getError(), is(Matchers.notNullValue())); + assertThat(responseDetach.getError().getCondition(), is(equalTo(AmqpError.RESOURCE_LOCKED))); + interaction.doCloseConnection(); + } + } + + private Target createTarget(final Symbol[] capabilities) + { + Target target = new Target(); + target.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose())); + target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); + target.setDynamic(true); + target.setCapabilities(capabilities); + return target; + } + + private Source createSource(final String name, final Symbol[] capabilities) + { + final Source source = new Source(); + source.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose())); + source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); + source.setCapabilities(capabilities); + source.setAddress(name); + return source; + } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c45aea4c/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/queue/TemporaryQueueTest.java ---------------------------------------------------------------------- diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/queue/TemporaryQueueTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/queue/TemporaryQueueTest.java index f7b8c18..833d7d6 100644 --- a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/queue/TemporaryQueueTest.java +++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/queue/TemporaryQueueTest.java @@ -100,6 +100,41 @@ public class TemporaryQueueTest extends JmsTestBase } @Test + public void testConsumeFromAnotherConnectionUsingTemporaryQueueName() throws Exception + { + final Connection connection = getConnection(); + try + { + final Connection connection2 = getConnection(); + try + { + final Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + final TemporaryQueue queue = session1.createTemporaryQueue(); + assertNotNull("Temporary queue cannot be null", queue); + + try + { + session2.createConsumer(session2.createQueue(queue.getQueueName())); + fail("Expected a JMSException when subscribing to a temporary queue created on a different session"); + } + catch (JMSException je) + { + //pass + } + } + finally + { + connection2.close(); + } + } + finally + { + connection.close(); + } + } + + @Test public void testPublishFromAnotherConnectionAllowed() throws Exception { final Connection connection = getConnection(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
