brusdev commented on code in PR #4563:
URL: https://github.com/apache/activemq-artemis/pull/4563#discussion_r1272098152
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java:
##########
@@ -1020,6 +1020,74 @@ public void messageArrived(String topic, MqttMessage
message) throws Exception {
consumer.disconnect();
}
+ /*
+ * From section 3.3.2.3.4 of the MQTT 5 specification:
+ *
+ * A sender can modify the Topic Alias mapping by sending another PUBLISH
in the same Network Connection with the
+ * same Topic Alias value and a different non-zero length Topic Name.
+ */
+ @Test(timeout = DEFAULT_TIMEOUT)
+ public void testModifiedTopicAlias() throws Exception {
+ final String TOPIC_1 = this.getTopicName() + "1";
+ final String TOPIC_2 = this.getTopicName() + "2";
+
+ MqttClient consumer1 = createPahoClient("consumer1");
+ CountDownLatch latch1 = new CountDownLatch(2);
+ consumer1.setCallback(new DefaultMqttCallback() {
+ @Override
+ public void messageArrived(String topic, MqttMessage message) throws
Exception {
+ String payload = new String(message.getPayload());
+ if (payload.equals("first") || payload.equals("second")) {
+ latch1.countDown();
+ }
+ }
+ });
+ consumer1.connect();
+ consumer1.subscribe(TOPIC_1, 1);
+
+ MqttClient consumer2 = createPahoClient("consumer2");
+ CountDownLatch latch2 = new CountDownLatch(2);
+ consumer2.setCallback(new DefaultMqttCallback() {
+ @Override
+ public void messageArrived(String topic, MqttMessage message) throws
Exception {
+ String payload = new String(message.getPayload());
+ if (payload.equals("third") || payload.equals("fourth")) {
+ latch2.countDown();
+ }
+ }
+ });
+ consumer2.connect();
+ consumer2.subscribe(TOPIC_2, 1);
+
+ MqttClient producer = createPahoClient("producer");
+ producer.connect();
+
+ MqttProperties properties = new MqttProperties();
+ properties.setTopicAlias(1);
+ MqttMessage m = new MqttMessage();
+ m.setProperties(properties);
+ m.setQos(1);
+ m.setRetained(false);
+ m.setPayload("first".getBytes(StandardCharsets.UTF_8));
+ producer.publish(TOPIC_1, m);
+ m.setPayload("second".getBytes(StandardCharsets.UTF_8));
+ producer.publish("", m);
Review Comment:
The test `testModifiedTopicAlias` is failing in my env with the following
error:
```
[main] 12:51:06,802 INFO
[org.apache.activemq.artemis.tests.util.ActiveMQTestBase] **** end #test
testModifiedTopicAlias[protocol=tcp]() ***
java.lang.IllegalArgumentException: Invalid topic length, should be in
range[1, 65535]!
at
org.eclipse.paho.mqttv5.common.util.MqttTopicValidator.validate(MqttTopicValidator.java:69)
at
org.eclipse.paho.mqttv5.client.MqttAsyncClient.publish(MqttAsyncClient.java:1519)
at
org.eclipse.paho.mqttv5.client.MqttClient.publish(MqttClient.java:564)
at
org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.PublishTests.testModifiedTopicAlias(PublishTests.java:1074)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
at
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:829)
```
##########
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java:
##########
@@ -189,27 +189,31 @@ void sendToQueue(MqttPublishMessage message, boolean
internal) throws Exception
String topic = message.variableHeader().topicName();
if (session.getVersion() == MQTTVersion.MQTT_5) {
Integer alias = MQTTUtil.getProperty(Integer.class,
message.variableHeader().properties(), TOPIC_ALIAS);
- Integer topicAliasMax =
session.getProtocolManager().getTopicAliasMaximum();
if (alias != null) {
+ Integer topicAliasMax =
session.getProtocolManager().getTopicAliasMaximum();
if (alias == 0) {
// [MQTT-3.3.2-8]
throw new
DisconnectException(MQTTReasonCodes.TOPIC_ALIAS_INVALID);
} else if (topicAliasMax != null && alias > topicAliasMax) {
// [MQTT-3.3.2-9]
throw new
DisconnectException(MQTTReasonCodes.TOPIC_ALIAS_INVALID);
- } else {
- topic = session.getState().getClientTopicAlias(alias);
- if (topic == null) {
- topic = message.variableHeader().topicName();
- if (topic == null || topic.length() == 0) {
- // using a topic alias with no matching topic in the
state; potentially [MQTT-3.3.2-7]
- throw new
DisconnectException(MQTTReasonCodes.TOPIC_ALIAS_INVALID);
- }
- session.getState().addClientTopicAlias(alias, topic);
+ }
+
+ String existingTopicMapping =
session.getState().getClientTopicAlias(alias);
+ if (existingTopicMapping == null) {
+ if (topic == null || topic.length() == 0) {
+ // using a topic alias with no matching topic in the
state; potentially [MQTT-3.3.2-7]
+ throw new
DisconnectException(MQTTReasonCodes.TOPIC_ALIAS_INVALID);
}
+ logger.debug("Adding new alias {} for topic {}", alias,
topic);
+ session.getState().addClientTopicAlias(alias, topic);
+ } else if (topic != null && topic.length() > 0) {
+ logger.debug("Modifying existing alias {}. New value: {};
old value: {}", alias, topic, existingTopicMapping);
+ session.getState().addClientTopicAlias(alias, topic);
Review Comment:
The add verb is a bit confusing, what about put?
```suggestion
session.getState().putClientTopicAlias(alias, topic);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]