This is an automated email from the ASF dual-hosted git repository.

rgodfrey pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git


The following commit(s) were added to refs/heads/main by this push:
     new d31f04f92b QPID-8571 : Non-unique consumer tags created for AMPQ 0-9-1
d31f04f92b is described below

commit d31f04f92b3cf59078f6806312069589c92c3b85
Author: rgodfrey <[email protected]>
AuthorDate: Fri Jan 31 11:55:25 2025 +0100

    QPID-8571 : Non-unique consumer tags created for AMPQ 0-9-1
---
 .../qpid/server/protocol/v0_8/AMQChannel.java      | 14 +++++++-
 .../apache/qpid/tests/protocol/v0_8/BasicTest.java | 38 +++++++++++++++++++---
 2 files changed, 47 insertions(+), 5 deletions(-)

diff --git 
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
 
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 688f685b79..3c4a14a219 100644
--- 
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ 
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -134,6 +134,7 @@ public class AMQChannel extends 
AbstractAMQPSession<AMQChannel, ConsumerTarget_0
 
     /** Maps from consumer tag to subscription instance. Allows us to 
unsubscribe from a queue. */
     private final Map<AMQShortString, ConsumerTarget_0_8> 
_tag2SubscriptionTargetMap = new HashMap<>();
+    private final Set<AMQShortString> _nonGeneratedTags = new HashSet<>();
 
     private final MessageStore _messageStore;
 
@@ -567,7 +568,16 @@ public class AMQChannel extends 
AbstractAMQPSession<AMQChannel, ConsumerTarget_0
     {
         if (tag == null)
         {
-            tag = AMQShortString.createAMQShortString("sgen_" + 
getNextConsumerTag());
+            do {
+                tag = AMQShortString.createAMQShortString("sgen_" + 
getNextConsumerTag());
+            }
+            while(_nonGeneratedTags.contains(tag));
+        }
+        else {
+            if(!_nonGeneratedTags.add(tag)) {
+                throw new ConsumerTagInUseException("Consumer already exists 
with same tag: " + tag);
+            }
+
         }
 
         if (_tag2SubscriptionTargetMap.containsKey(tag))
@@ -699,6 +709,7 @@ public class AMQChannel extends 
AbstractAMQPSession<AMQChannel, ConsumerTarget_0
                 | MessageSource.ConsumerAccessRefused e)
         {
             _tag2SubscriptionTargetMap.remove(tag);
+            _nonGeneratedTags.remove(tag);
             throw e;
         }
         return tag;
@@ -716,6 +727,7 @@ public class AMQChannel extends 
AbstractAMQPSession<AMQChannel, ConsumerTarget_0
             LOGGER.debug("Unsubscribing consumer '{}' on channel {}", 
consumerTag, this);
         }
 
+        _nonGeneratedTags.remove(consumerTag);
         ConsumerTarget_0_8 target = 
_tag2SubscriptionTargetMap.remove(consumerTag);
         if (target != null)
         {
diff --git 
a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
 
b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
index d8242eb752..dd7e8ed779 100644
--- 
a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
+++ 
b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
@@ -20,10 +20,7 @@
  */
 package org.apache.qpid.tests.protocol.v0_8;
 
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.CoreMatchers.*;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assumptions.assumeTrue;
 
@@ -292,6 +289,39 @@ public class BasicTest extends BrokerAdminUsingTestBase
         }
     }
 
+    @Test
+    @SpecificationTest(section = "1.8.3.3", description = "If no consumer tag 
is given then the server should generate " +
+            "a unique tag, not clashing with an existing tag")
+    public void serverGeneratedTagUniqueness() throws Exception
+    {
+        try(FrameTransport transport = new 
FrameTransport(getBrokerAdmin()).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+
+
+            String queueName = BrokerAdmin.TEST_QUEUE_NAME;
+            String consumerTag = "sgen_1";
+            interaction.negotiateOpen()
+                    .channel().open()
+                    .consumeResponse(ChannelOpenOkBody.class)
+                    .basic().qosPrefetchCount(1)
+                    .qos()
+                    .consumeResponse(BasicQosOkBody.class)
+                    .basic().consumeConsumerTag(consumerTag)
+                    .consumeQueue(queueName)
+                    .consume()
+                    .consumeResponse(BasicConsumeOkBody.class)
+                    .channel().flow(true)
+                    .consumeResponse(ChannelFlowOkBody.class);
+
+            BasicConsumeOkBody consumeOK = 
interaction.basic().consumeConsumerTag("")
+                    .consumeQueue(queueName)
+                    .consume()
+                    
.consumeResponse(BasicConsumeOkBody.class).getLatestResponse(BasicConsumeOkBody.class);
+            assertThat(consumeOK.getConsumerTag().toString(), 
is(not(equalTo(consumerTag))));
+        }
+    }
+
     @Test
     @SpecificationTest(section = "1.8.3.13",
             description = "The server MUST validate that a non-zero 
delivery-tag refers to a delivered message,"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to