This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 22e3b09b9c ARTEMIS-4370 update existing topic alias for MQTT 5
publisher
22e3b09b9c is described below
commit 22e3b09b9c2ad8c1c0da6e6bc97f4f0bf5e28af3
Author: Justin Bertram <[email protected]>
AuthorDate: Fri Jul 21 13:05:25 2023 -0500
ARTEMIS-4370 update existing topic alias for MQTT 5 publisher
---
.../core/protocol/mqtt/MQTTPublishManager.java | 28 ++++++----
.../core/protocol/mqtt/MQTTSessionState.java | 2 +-
.../mqtt5/spec/controlpackets/PublishTests.java | 65 ++++++++++++++++++++++
3 files changed, 82 insertions(+), 13 deletions(-)
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index 57b9605d21..057e2a957b 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -189,27 +189,31 @@ public class MQTTPublishManager {
String topic = message.variableHeader().topicName();
if (session.getVersion() == MQTTVersion.MQTT_5) {
Integer alias = MQTTUtil.getProperty(Integer.class,
message.variableHeader().properties(), TOPIC_ALIAS);
- Integer topicAliasMax =
session.getProtocolManager().getTopicAliasMaximum();
if (alias != null) {
+ Integer topicAliasMax =
session.getProtocolManager().getTopicAliasMaximum();
if (alias == 0) {
// [MQTT-3.3.2-8]
throw new
DisconnectException(MQTTReasonCodes.TOPIC_ALIAS_INVALID);
} else if (topicAliasMax != null && alias > topicAliasMax) {
// [MQTT-3.3.2-9]
throw new
DisconnectException(MQTTReasonCodes.TOPIC_ALIAS_INVALID);
- } else {
- topic = session.getState().getClientTopicAlias(alias);
- if (topic == null) {
- topic = message.variableHeader().topicName();
- if (topic == null || topic.length() == 0) {
- // using a topic alias with no matching topic in the
state; potentially [MQTT-3.3.2-7]
- throw new
DisconnectException(MQTTReasonCodes.TOPIC_ALIAS_INVALID);
- }
- session.getState().addClientTopicAlias(alias, topic);
+ }
+
+ String existingTopicMapping =
session.getState().getClientTopicAlias(alias);
+ if (existingTopicMapping == null) {
+ if (topic == null || topic.length() == 0) {
+ // using a topic alias with no matching topic in the
state; potentially [MQTT-3.3.2-7]
+ throw new
DisconnectException(MQTTReasonCodes.TOPIC_ALIAS_INVALID);
}
+ logger.debug("Adding new alias {} for topic {}", alias,
topic);
+ session.getState().putClientTopicAlias(alias, topic);
+ } else if (topic != null && topic.length() > 0) {
+ logger.debug("Modifying existing alias {}. New value: {};
old value: {}", alias, topic, existingTopicMapping);
+ session.getState().putClientTopicAlias(alias, topic);
+ } else {
+ logger.debug("Applying topic {} for alias {}",
existingTopicMapping, alias);
+ topic = existingTopicMapping;
}
- } else {
- topic = message.variableHeader().topicName();
}
}
String coreAddress =
MQTTUtil.convertMqttTopicFilterToCoreAddress(topic,
session.getWildcardConfiguration());
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
index 23209c7f14..33e6f159ed 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
@@ -307,7 +307,7 @@ public class MQTTSessionState {
this.clientMaxPacketSize = clientMaxPacketSize;
}
- public void addClientTopicAlias(Integer alias, String topicName) {
+ public void putClientTopicAlias(Integer alias, String topicName) {
if (clientTopicAliases == null) {
clientTopicAliases = new HashMap<>();
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
index 84ad469961..de0b6a7abf 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
@@ -1020,6 +1020,71 @@ public class PublishTests extends MQTT5TestSupport {
consumer.disconnect();
}
+ /*
+ * From section 3.3.2.3.4 of the MQTT 5 specification:
+ *
+ * A sender can modify the Topic Alias mapping by sending another PUBLISH
in the same Network Connection with the
+ * same Topic Alias value and a different non-zero length Topic Name.
+ */
+ @Test(timeout = DEFAULT_TIMEOUT)
+ public void testModifiedTopicAlias() throws Exception {
+ final String TOPIC_1 = this.getTopicName() + "1";
+ final String TOPIC_2 = this.getTopicName() + "2";
+
+ MqttClient consumer1 = createPahoClient("consumer1");
+ CountDownLatch latch1 = new CountDownLatch(1);
+ consumer1.setCallback(new DefaultMqttCallback() {
+ @Override
+ public void messageArrived(String topic, MqttMessage message) throws
Exception {
+ String payload = new String(message.getPayload());
+ if (payload.equals("first")) {
+ latch1.countDown();
+ }
+ }
+ });
+ consumer1.connect();
+ consumer1.subscribe(TOPIC_1, 1);
+
+ MqttClient consumer2 = createPahoClient("consumer2");
+ CountDownLatch latch2 = new CountDownLatch(1);
+ consumer2.setCallback(new DefaultMqttCallback() {
+ @Override
+ public void messageArrived(String topic, MqttMessage message) throws
Exception {
+ String payload = new String(message.getPayload());
+ if (payload.equals("second")) {
+ latch2.countDown();
+ }
+ }
+ });
+ consumer2.connect();
+ consumer2.subscribe(TOPIC_2, 1);
+
+ MqttClient producer = createPahoClient("producer");
+ producer.connect();
+
+ MqttProperties properties = new MqttProperties();
+ properties.setTopicAlias(1);
+ MqttMessage m = new MqttMessage();
+ m.setProperties(properties);
+ m.setQos(1);
+ m.setRetained(false);
+ m.setPayload("first".getBytes(StandardCharsets.UTF_8));
+ producer.publish(TOPIC_1, m);
+ m.setPayload("second".getBytes(StandardCharsets.UTF_8));
+ producer.publish(TOPIC_2, m);
+
+ producer.disconnect();
+ producer.close();
+
+ assertTrue(latch1.await(2, TimeUnit.SECONDS));
+ assertTrue(latch2.await(2, TimeUnit.SECONDS));
+
+ consumer1.disconnect();
+ consumer1.close();
+ consumer2.disconnect();
+ consumer2.close();
+ }
+
/*
* [MQTT-3.3.2-15] The Server MUST send the Response Topic unaltered to all
subscribers receiving the Application
* Message.