Repository: qpid-broker-j
Updated Branches:
  refs/heads/master dbd42eaf5 -> c45aea4c4


QPID-8164: Make sure that only own connection consumers can consume from JMS 
temporary destinations


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/c45aea4c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/c45aea4c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/c45aea4c

Branch: refs/heads/master
Commit: c45aea4c41a7c389c58ede39e8cb8913b25cfab2
Parents: dbd42ea
Author: Alex Rudyy <[email protected]>
Authored: Wed Apr 18 22:14:11 2018 +0100
Committer: Alex Rudyy <[email protected]>
Committed: Thu Apr 19 17:45:49 2018 +0100

----------------------------------------------------------------------
 .../qpid/server/protocol/v1_0/Session_1_0.java  |  21 +-
 .../v1_0/StandardReceivingLinkEndpoint.java     |   4 +
 .../bindmapjms/TemporaryDestinationTest.java    | 318 ++++++++++++++++++-
 .../jms_1_1/queue/TemporaryQueueTest.java       |  35 ++
 4 files changed, 366 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c45aea4c/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index ec875e1..cd8eb83 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -28,6 +28,7 @@ import java.security.AccessController;
 import java.security.PrivilegedAction;
 import java.text.MessageFormat;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -64,6 +65,7 @@ import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.DestinationAddress;
 import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.ExclusivityPolicy;
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.v1_0.delivery.DeliveryRegistry;
@@ -715,7 +717,10 @@ public class Session_1_0 extends 
AbstractAMQPSession<Session_1_0, ConsumerTarget
 
         if (Boolean.TRUE.equals(source.getDynamic()))
         {
-            MessageSource tempSource = createDynamicSource(link, 
source.getDynamicNodeProperties());
+            final Set<Symbol> sourceCapabilities = source.getCapabilities() == 
null
+                    ? Collections.emptySet()
+                    : new HashSet<>(Arrays.asList(source.getCapabilities()));
+            MessageSource tempSource = createDynamicSource(link, 
source.getDynamicNodeProperties(), sourceCapabilities);
             if(tempSource != null)
             {
                 source.setAddress(_primaryDomain + tempSource.getName());
@@ -789,7 +794,9 @@ public class Session_1_0 extends 
AbstractAMQPSession<Session_1_0, ConsumerTarget
         return exchangeDestination;
     }
 
-    private MessageSource createDynamicSource(final Link_1_0<?, ?> link, Map 
properties) throws AmqpErrorException
+    private MessageSource createDynamicSource(final Link_1_0<?, ?> link,
+                                              Map properties,
+                                              final Set<Symbol> capabilities) 
throws AmqpErrorException
     {
         // TODO temporary topics?
         final String queueName = "TempQueue" + UUID.randomUUID().toString();
@@ -797,6 +804,12 @@ public class Session_1_0 extends 
AbstractAMQPSession<Session_1_0, ConsumerTarget
         {
             Map<String, Object> attributes = 
convertDynamicNodePropertiesToAttributes(link, properties, queueName);
 
+            if (capabilities.contains(Symbol.valueOf("temporary-queue"))
+                || capabilities.contains(Symbol.valueOf("temporary-topic")))
+            {
+                attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONNECTION);
+            }
+
             return Subject.doAs(getSubjectWithAddedSystemRights(),
                                 (PrivilegedAction<MessageSource>) () -> 
getAddressSpace().createMessageSource(MessageSource.class, attributes));
         }
@@ -829,6 +842,10 @@ public class Session_1_0 extends 
AbstractAMQPSession<Session_1_0, ConsumerTarget
             {
                 attributes.put(Exchange.TYPE, 
ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
             }
+            else if (capabilitySet.contains(Symbol.valueOf("temporary-queue")))
+            {
+                attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONNECTION);
+            }
 
             return Subject.doAs(getSubjectWithAddedSystemRights(),
                                 (PrivilegedAction<MessageDestination>) () -> 
getAddressSpace().createMessageDestination(clazz, attributes));

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c45aea4c/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 91ab75b..cc689b9 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -439,6 +439,10 @@ public class StandardReceivingLinkEndpoint extends 
AbstractReceivingLinkEndpoint
             {
                 targetCapabilities.add(Symbol.valueOf("temporary-topic"));
             }
+            if 
(desiredCapabilities.contains(Symbol.valueOf("temporary-queue")))
+            {
+                targetCapabilities.add(Symbol.valueOf("temporary-queue"));
+            }
             if (desiredCapabilities.contains(Symbol.valueOf("topic")))
             {
                 targetCapabilities.add(Symbol.valueOf("topic"));

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c45aea4c/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
 
b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
index a3270a0..4160336 100644
--- 
a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
+++ 
b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
@@ -20,32 +20,41 @@
 
 package org.apache.qpid.tests.protocol.v1_0.extensions.bindmapjms;
 
+import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 
 import java.net.InetSocketAddress;
 import java.util.Collections;
 
+import org.hamcrest.Matchers;
 import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.qpid.server.protocol.v1_0.Session_1_0;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnClose;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import 
org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
-import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 
 public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
 {
@@ -83,11 +92,7 @@ public class TemporaryDestinationTest extends 
BrokerAdminUsingTestBase
 
         try (FrameTransport transport = new 
FrameTransport(_brokerAddress).connect())
         {
-            Target target = new Target();
-            
target.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY,
 new DeleteOnClose()));
-            target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
-            target.setDynamic(true);
-            target.setCapabilities(targetCapabilities);
+            Target target = createTarget(targetCapabilities);
 
             final Interaction interaction = transport.newInteraction();
             final Attach attachResponse = 
interaction.negotiateProtocol().consumeResponse()
@@ -104,8 +109,6 @@ public class TemporaryDestinationTest extends 
BrokerAdminUsingTestBase
             newTemporaryNodeAddress = ((Target) 
attachResponse.getTarget()).getAddress();
             assertThat(newTemporaryNodeAddress, is(notNullValue()));
 
-            assertThat(Utils.doesNodeExist(_brokerAddress, 
newTemporaryNodeAddress), is(true));
-
             interaction.consumeResponse().getLatestResponse(Flow.class);
 
             interaction.doCloseConnection();
@@ -113,4 +116,299 @@ public class TemporaryDestinationTest extends 
BrokerAdminUsingTestBase
 
         assertThat(Utils.doesNodeExist(_brokerAddress, 
newTemporaryNodeAddress), is(false));
     }
+
+
+    @Test
+    @SpecificationTest(section = "N/A",
+            description = "JMS 2.0."
+                          + " 6.2.2. Creating temporary destinations"
+                          + "Temporary destinations ( TemporaryQueue or  
TemporaryTopic objects) are destinations"
+                          + " that are system - generated uniquely for their 
connection.  Only their own connection"
+                          + " is allowed to create consumer objects for them."
+                          + ""
+                          + "4.1.5. TemporaryQueue"
+                          + "A TemporaryQueue is a unique Queue object created 
for the duration of a connection."
+                          + " It is a system-defined queue that can only be 
consumed by the connection that created it."
+                          + ""
+                          + "AMQP JMS Mapping."
+                          + " 5.2. Destinations And Producers/Consumers"
+                          + "[...] type information SHOULD be conveyed when 
creating producer or consumer links"
+                          + " for the application by supplying a terminus 
capability for the particular Destination"
+                          + " type to which the client expects to attach [...]"
+                          + "TemporaryQueue Terminus capability : 
'temporary-queue'")
+    public void canConsumeFormTemporaryQueueCreatedOnTheSameConnection() 
throws Exception
+    {
+        final Symbol[] capabilities = new 
Symbol[]{Symbol.valueOf("temporary-queue")};
+        try (FrameTransport transport = new 
FrameTransport(_brokerAddress).connect())
+        {
+            Target target = createTarget(capabilities);
+
+            final Interaction interaction = transport.newInteraction();
+            final UnsignedInteger senderHandle = UnsignedInteger.ONE;
+            final Attach senderAttachResponse = 
interaction.negotiateProtocol().consumeResponse()
+                                                           
.open().consumeResponse(Open.class)
+                                                           
.begin().consumeResponse(Begin.class)
+                                                           
.attachRole(Role.SENDER)
+                                                           
.attachHandle(senderHandle)
+                                                           
.attachTarget(target)
+                                                           
.attach().consumeResponse()
+                                                           
.getLatestResponse(Attach.class);
+
+            assertThat(senderAttachResponse.getSource(), is(notNullValue()));
+            assertThat(senderAttachResponse.getTarget(), is(notNullValue()));
+
+            String newTemporaryNodeAddress = ((Target) 
senderAttachResponse.getTarget()).getAddress();
+            assertThat(newTemporaryNodeAddress, is(notNullValue()));
+
+            interaction.consumeResponse().getLatestResponse(Flow.class);
+
+            final Attach receiverAttachResponse = 
interaction.attachRole(Role.RECEIVER)
+                                                             
.attachSource(createSource(newTemporaryNodeAddress,
+                                                                               
         capabilities))
+                                                             
.attachHandle(UnsignedInteger.valueOf(2))
+                                                             
.attach().consumeResponse()
+                                                             
.getLatestResponse(Attach.class);
+
+            assertThat(receiverAttachResponse.getSource(), is(notNullValue()));
+            assertThat(receiverAttachResponse.getSource(), is(notNullValue()));
+            assertThat(((Source) 
receiverAttachResponse.getSource()).getAddress(),
+                       is(equalTo(newTemporaryNodeAddress)));
+
+            interaction.doCloseConnection();
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "N/A",
+            description = "JMS 2.0."
+                          + " 6.2.2. Creating temporary destinations"
+                          + "Temporary destinations ( TemporaryQueue or  
TemporaryTopic objects) are destinations"
+                          + " that are system - generated uniquely for their 
connection.  Only their own connection"
+                          + " is allowed to create consumer objects for them."
+                          + ""
+                          + "4.1.5. TemporaryQueue"
+                          + "A TemporaryQueue is a unique Queue object created 
for the duration of a connection."
+                          + " It is a system-defined queue that can only be 
consumed by the connection that created it."
+                          + ""
+                          + "AMQP JMS Mapping."
+                          + " 5.2. Destinations And Producers/Consumers"
+                          + "[...] type information SHOULD be conveyed when 
creating producer or consumer links"
+                          + " for the application by supplying a terminus 
capability for the particular Destination"
+                          + " type to which the client expects to attach [...]"
+                          + "TemporaryQueue Terminus capability : 
'temporary-queue'")
+    public void canNotConsumeFormTemporaryQueueCreatedOnOtherConnection() 
throws Exception
+    {
+        final Symbol[] capabilities = new 
Symbol[]{Symbol.valueOf("temporary-queue")};
+        try (FrameTransport transport = new 
FrameTransport(_brokerAddress).connect())
+        {
+            Target target = createTarget(capabilities);
+
+            final Interaction interaction = transport.newInteraction();
+            final UnsignedInteger senderHandle = UnsignedInteger.ONE;
+            final Attach senderAttachResponse = 
interaction.negotiateProtocol().consumeResponse()
+                                                           
.open().consumeResponse(Open.class)
+                                                           
.begin().consumeResponse(Begin.class)
+                                                           
.attachRole(Role.SENDER)
+                                                           
.attachHandle(senderHandle)
+                                                           
.attachTarget(target)
+                                                           
.attach().consumeResponse()
+                                                           
.getLatestResponse(Attach.class);
+
+            assertThat(senderAttachResponse.getSource(), is(notNullValue()));
+            assertThat(senderAttachResponse.getTarget(), is(notNullValue()));
+
+            String newTemporaryNodeAddress = ((Target) 
senderAttachResponse.getTarget()).getAddress();
+            assertThat(newTemporaryNodeAddress, is(notNullValue()));
+
+            interaction.consumeResponse().getLatestResponse(Flow.class);
+
+            tryToConsume(createSource(newTemporaryNodeAddress, capabilities));
+
+            interaction.doCloseConnection();
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "N/A",
+            description = "JMS 2.0."
+                          + " 6.2.2. Creating temporary destinations"
+                          + "Temporary destinations ( TemporaryQueue or  
TemporaryTopic objects) are destinations"
+                          + " that are system - generated uniquely for their 
connection.  Only their own connection"
+                          + " is allowed to create consumer objects for them."
+                          + ""
+                          + "4.2.7. Temporary topics"
+                          + "A TemporaryTopic is a unique Topic object created 
for the duration of a JMSContext,"
+                          + " Connection or TopicConnection . It is a system 
defined Topic whose messages may be"
+                          + " consumed only by the connection that created it."
+                          + ""
+                          + "AMQP JMS Mapping."
+                          + " 5.2. Destinations And Producers/Consumers"
+                          + "[...] type information SHOULD be conveyed when 
creating producer or consumer links"
+                          + " for the application by supplying a terminus 
capability for the particular Destination"
+                          + " type to which the client expects to attach"
+                          + "TemporaryTopic Terminus capability : 
'temporary-topic'")
+    public void canConsumeFormTemporaryTopicCreatedOnTheSameConnection() 
throws Exception
+    {
+        final Symbol[] capabilities = new 
Symbol[]{Symbol.valueOf("temporary-topic")};
+        try (FrameTransport transport = new 
FrameTransport(_brokerAddress).connect())
+        {
+            final Source source = new Source();
+            
source.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY,
 new DeleteOnClose()));
+            source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+            source.setCapabilities(capabilities);
+            source.setDynamic(true);
+
+            final Interaction interaction = transport.newInteraction();
+            final UnsignedInteger receiverHandle = UnsignedInteger.ONE;
+            final Attach receiverAttachResponse = 
interaction.negotiateProtocol().consumeResponse()
+                                                             
.open().consumeResponse(Open.class)
+                                                             
.begin().consumeResponse(Begin.class)
+                                                             
.attachRole(Role.RECEIVER)
+                                                             
.attachSource(source)
+                                                             
.attachHandle(receiverHandle)
+                                                             
.attach().consumeResponse()
+                                                             
.getLatestResponse(Attach.class);
+
+            assertThat(receiverAttachResponse.getSource(), is(notNullValue()));
+            assertThat(receiverAttachResponse.getSource(), is(notNullValue()));
+
+            String newTemporaryNodeAddress = ((Source) 
receiverAttachResponse.getSource()).getAddress();
+            assertThat(newTemporaryNodeAddress, is(notNullValue()));
+
+            Target target = new Target();
+            
target.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY,
 new DeleteOnClose()));
+            target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+            target.setCapabilities(capabilities);
+            target.setAddress(newTemporaryNodeAddress);
+
+            final UnsignedInteger senderHandle = UnsignedInteger.valueOf(2);
+            interaction.attachRole(Role.SENDER)
+                       .attachHandle(senderHandle)
+                       .attachTarget(target)
+                       .attach()
+                       .consumeResponse(Attach.class)
+                       .consumeResponse(Flow.class);
+
+            String testData = "testData";
+            Disposition responseDisposition = 
interaction.transferPayloadData(testData)
+                                                         
.transferHandle(senderHandle)
+                                                         .transfer()
+                                                         .consumeResponse()
+                                                         
.getLatestResponse(Disposition.class);
+            assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
+            assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
+            assertThat(responseDisposition.getState(), 
is(instanceOf(Accepted.class)));
+
+            interaction.flowIncomingWindow(UnsignedInteger.ONE)
+                       .flowNextIncomingId(UnsignedInteger.ZERO)
+                       .flowOutgoingWindow(UnsignedInteger.ZERO)
+                       .flowNextOutgoingId(UnsignedInteger.ZERO)
+                       .flowLinkCredit(UnsignedInteger.ONE)
+                       .flowHandle(receiverHandle)
+                       .flow()
+                       .receiveDelivery()
+                       .decodeLatestDelivery();
+
+            assertThat(interaction.getDecodedLatestDelivery(), 
is(equalTo(testData)));
+
+            interaction.doCloseConnection();
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "N/A",
+            description = "JMS 2.0."
+                          + " 6.2.2. Creating temporary destinations"
+                          + "Temporary destinations ( TemporaryQueue or  
TemporaryTopic objects) are destinations"
+                          + " that are system - generated uniquely for their 
connection.  Only their own connection"
+                          + " is allowed to create consumer objects for them."
+                          + ""
+                          + "4.2.7. Temporary topics"
+                          + "A TemporaryTopic is a unique Topic object created 
for the duration of a JMSContext,"
+                          + " Connection or TopicConnection . It is a system 
defined Topic whose messages may be"
+                          + " consumed only by the connection that created it."
+                          + ""
+                          + "AMQP JMS Mapping."
+                          + " 5.2. Destinations And Producers/Consumers"
+                          + "[...] type information SHOULD be conveyed when 
creating producer or consumer links"
+                          + " for the application by supplying a terminus 
capability for the particular Destination"
+                          + " type to which the client expects to attach"
+                          + "TemporaryTopic Terminus capability : 
'temporary-topic'")
+    public void canNotConsumeFormTemporaryTopicCreatedOnOtherConnection() 
throws Exception
+    {
+        final Symbol[] capabilities = new 
Symbol[]{Symbol.valueOf("temporary-topic")};
+        try (FrameTransport transport = new 
FrameTransport(_brokerAddress).connect())
+        {
+            final Source source = new Source();
+            
source.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY,
 new DeleteOnClose()));
+            source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+            source.setCapabilities(capabilities);
+            source.setDynamic(true);
+
+            final Interaction interaction = transport.newInteraction();
+            final UnsignedInteger receiverHandle = UnsignedInteger.ONE;
+            final Attach receiverAttachResponse = 
interaction.negotiateProtocol().consumeResponse()
+                                                             
.open().consumeResponse(Open.class)
+                                                             
.begin().consumeResponse(Begin.class)
+                                                             
.attachRole(Role.RECEIVER)
+                                                             
.attachSource(source)
+                                                             
.attachHandle(receiverHandle)
+                                                             
.attach().consumeResponse()
+                                                             
.getLatestResponse(Attach.class);
+
+            assertThat(receiverAttachResponse.getSource(), is(notNullValue()));
+            assertThat(receiverAttachResponse.getSource(), is(notNullValue()));
+
+            String newTemporaryNodeAddress = ((Source) 
receiverAttachResponse.getSource()).getAddress();
+            assertThat(newTemporaryNodeAddress, is(notNullValue()));
+
+            tryToConsume(createSource(newTemporaryNodeAddress, capabilities));
+
+            interaction.doCloseConnection();
+        }
+    }
+
+    private void tryToConsume(final Source source) throws Exception
+    {
+        try (FrameTransport transport = new 
FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            final Detach responseDetach = interaction.negotiateProtocol()
+                                                     .consumeResponse()
+                                                     .open()
+                                                     
.consumeResponse(Open.class)
+                                                     .begin()
+                                                     
.consumeResponse(Begin.class)
+                                                     .attachRole(Role.RECEIVER)
+                                                     .attachSource(source)
+                                                     .attach()
+                                                     
.consumeResponse(Attach.class)
+                                                     
.consumeResponse(Detach.class)
+                                                     
.getLatestResponse(Detach.class);
+            assertThat(responseDetach.getClosed(), is(true));
+            assertThat(responseDetach.getError(), is(Matchers.notNullValue()));
+            assertThat(responseDetach.getError().getCondition(), 
is(equalTo(AmqpError.RESOURCE_LOCKED)));
+            interaction.doCloseConnection();
+        }
+    }
+
+    private Target createTarget(final Symbol[] capabilities)
+    {
+        Target target = new Target();
+        
target.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY,
 new DeleteOnClose()));
+        target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+        target.setDynamic(true);
+        target.setCapabilities(capabilities);
+        return target;
+    }
+
+    private Source createSource(final String name, final Symbol[] capabilities)
+    {
+        final Source source = new Source();
+        
source.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY,
 new DeleteOnClose()));
+        source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+        source.setCapabilities(capabilities);
+        source.setAddress(name);
+        return source;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c45aea4c/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/queue/TemporaryQueueTest.java
----------------------------------------------------------------------
diff --git 
a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/queue/TemporaryQueueTest.java
 
b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/queue/TemporaryQueueTest.java
index f7b8c18..833d7d6 100644
--- 
a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/queue/TemporaryQueueTest.java
+++ 
b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/queue/TemporaryQueueTest.java
@@ -100,6 +100,41 @@ public class TemporaryQueueTest extends JmsTestBase
     }
 
     @Test
+    public void testConsumeFromAnotherConnectionUsingTemporaryQueueName() 
throws Exception
+    {
+        final Connection connection = getConnection();
+        try
+        {
+            final Connection connection2 = getConnection();
+            try
+            {
+                final Session session1 = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                final Session session2 = connection2.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                final TemporaryQueue queue = session1.createTemporaryQueue();
+                assertNotNull("Temporary queue cannot be null", queue);
+
+                try
+                {
+                    
session2.createConsumer(session2.createQueue(queue.getQueueName()));
+                    fail("Expected a JMSException when subscribing to a 
temporary queue created on a different session");
+                }
+                catch (JMSException je)
+                {
+                    //pass
+                }
+            }
+            finally
+            {
+                connection2.close();
+            }
+        }
+        finally
+        {
+            connection.close();
+        }
+    }
+
+    @Test
     public void testPublishFromAnotherConnectionAllowed() throws Exception
     {
         final Connection connection = getConnection();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to