This is an automated email from the ASF dual-hosted git repository.
rgodfrey pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/main by this push:
new d31f04f92b QPID-8571 : Non-unique consumer tags created for AMPQ 0-9-1
d31f04f92b is described below
commit d31f04f92b3cf59078f6806312069589c92c3b85
Author: rgodfrey <[email protected]>
AuthorDate: Fri Jan 31 11:55:25 2025 +0100
QPID-8571 : Non-unique consumer tags created for AMPQ 0-9-1
---
.../qpid/server/protocol/v0_8/AMQChannel.java | 14 +++++++-
.../apache/qpid/tests/protocol/v0_8/BasicTest.java | 38 +++++++++++++++++++---
2 files changed, 47 insertions(+), 5 deletions(-)
diff --git
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 688f685b79..3c4a14a219 100644
---
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -134,6 +134,7 @@ public class AMQChannel extends
AbstractAMQPSession<AMQChannel, ConsumerTarget_0
/** Maps from consumer tag to subscription instance. Allows us to
unsubscribe from a queue. */
private final Map<AMQShortString, ConsumerTarget_0_8>
_tag2SubscriptionTargetMap = new HashMap<>();
+ private final Set<AMQShortString> _nonGeneratedTags = new HashSet<>();
private final MessageStore _messageStore;
@@ -567,7 +568,16 @@ public class AMQChannel extends
AbstractAMQPSession<AMQChannel, ConsumerTarget_0
{
if (tag == null)
{
- tag = AMQShortString.createAMQShortString("sgen_" +
getNextConsumerTag());
+ do {
+ tag = AMQShortString.createAMQShortString("sgen_" +
getNextConsumerTag());
+ }
+ while(_nonGeneratedTags.contains(tag));
+ }
+ else {
+ if(!_nonGeneratedTags.add(tag)) {
+ throw new ConsumerTagInUseException("Consumer already exists
with same tag: " + tag);
+ }
+
}
if (_tag2SubscriptionTargetMap.containsKey(tag))
@@ -699,6 +709,7 @@ public class AMQChannel extends
AbstractAMQPSession<AMQChannel, ConsumerTarget_0
| MessageSource.ConsumerAccessRefused e)
{
_tag2SubscriptionTargetMap.remove(tag);
+ _nonGeneratedTags.remove(tag);
throw e;
}
return tag;
@@ -716,6 +727,7 @@ public class AMQChannel extends
AbstractAMQPSession<AMQChannel, ConsumerTarget_0
LOGGER.debug("Unsubscribing consumer '{}' on channel {}",
consumerTag, this);
}
+ _nonGeneratedTags.remove(consumerTag);
ConsumerTarget_0_8 target =
_tag2SubscriptionTargetMap.remove(consumerTag);
if (target != null)
{
diff --git
a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
index d8242eb752..dd7e8ed779 100644
---
a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
+++
b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
@@ -20,10 +20,7 @@
*/
package org.apache.qpid.tests.protocol.v0_8;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
@@ -292,6 +289,39 @@ public class BasicTest extends BrokerAdminUsingTestBase
}
}
+ @Test
+ @SpecificationTest(section = "1.8.3.3", description = "If no consumer tag
is given then the server should generate " +
+ "a unique tag, not clashing with an existing tag")
+ public void serverGeneratedTagUniqueness() throws Exception
+ {
+ try(FrameTransport transport = new
FrameTransport(getBrokerAdmin()).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+
+
+ String queueName = BrokerAdmin.TEST_QUEUE_NAME;
+ String consumerTag = "sgen_1";
+ interaction.negotiateOpen()
+ .channel().open()
+ .consumeResponse(ChannelOpenOkBody.class)
+ .basic().qosPrefetchCount(1)
+ .qos()
+ .consumeResponse(BasicQosOkBody.class)
+ .basic().consumeConsumerTag(consumerTag)
+ .consumeQueue(queueName)
+ .consume()
+ .consumeResponse(BasicConsumeOkBody.class)
+ .channel().flow(true)
+ .consumeResponse(ChannelFlowOkBody.class);
+
+ BasicConsumeOkBody consumeOK =
interaction.basic().consumeConsumerTag("")
+ .consumeQueue(queueName)
+ .consume()
+
.consumeResponse(BasicConsumeOkBody.class).getLatestResponse(BasicConsumeOkBody.class);
+ assertThat(consumeOK.getConsumerTag().toString(),
is(not(equalTo(consumerTag))));
+ }
+ }
+
@Test
@SpecificationTest(section = "1.8.3.13",
description = "The server MUST validate that a non-zero
delivery-tag refers to a delivered message,"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]