This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 055100751c ARTEMIS-4459 log when ignoring dupe MQTT QoS2 pub
055100751c is described below
commit 055100751cba548b9c52643c84f47ffcaf82b616
Author: Justin Bertram <[email protected]>
AuthorDate: Thu Oct 12 15:41:40 2023 -0500
ARTEMIS-4459 log when ignoring dupe MQTT QoS2 pub
In accordance with the QoS2 protocol outlined in the MQTT
specification(s), once the broker receives a PUBLISH then any other
PUBLISH it receives on that same session with the same packet ID must be
ignored until the QoS2 protocol for that ID is completed.
The broker does this, but it doesn't log anything so it's not clear when
this is actually happening.
---
.../activemq/artemis/core/protocol/mqtt/MQTTLogger.java | 3 +++
.../artemis/core/protocol/mqtt/MQTTPublishManager.java | 13 ++++++++-----
2 files changed, 11 insertions(+), 5 deletions(-)
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
index 834dadba2c..ca4cb9f7d3 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
@@ -61,4 +61,7 @@ public interface MQTTLogger {
@LogMessage(id = 834008, value = "Failed to remove session state for client
with ID: {}", level = LogMessage.Level.ERROR)
void failedToRemoveSessionState(String clientID, Exception e);
+
+ @LogMessage(id = 834009, value = "Ignoring duplicate MQTT QoS2 PUBLISH
packet for packet ID {} from client with ID {}.", level = LogMessage.Level.WARN)
+ void ignoringQoS2Publish(String clientId, long packetId);
}
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index 057e2a957b..eb31cccfb8 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -223,10 +223,11 @@ public class MQTTPublishManager {
if (qos > 0) {
serverMessage.setDurable(MQTTUtil.DURABLE_MESSAGES);
}
- int messageId = message.variableHeader().packetId();
- if (qos < 2 || !state.getPubRec().contains(messageId)) {
+ int packetId = message.variableHeader().packetId();
+ boolean qos2PublishAlreadyReceived =
state.getPubRec().contains(packetId);
+ if (qos < 2 || !qos2PublishAlreadyReceived) {
if (qos == 2 && !internal)
- state.getPubRec().add(messageId);
+ state.getPubRec().add(packetId);
Transaction tx = session.getServerSession().newTransaction();
try {
@@ -252,7 +253,7 @@ public class MQTTPublishManager {
throw e;
}
if (session.getVersion() == MQTTVersion.MQTT_5) {
- sendMessageAck(internal, qos, messageId,
MQTTReasonCodes.NOT_AUTHORIZED);
+ sendMessageAck(internal, qos, packetId,
MQTTReasonCodes.NOT_AUTHORIZED);
return;
} else if (session.getVersion() == MQTTVersion.MQTT_3_1_1) {
/*
@@ -287,9 +288,11 @@ public class MQTTPublishManager {
tx.rollback();
throw t;
}
+ } else if (qos2PublishAlreadyReceived) {
+ MQTTLogger.LOGGER.ignoringQoS2Publish(state.getClientId(),
packetId);
}
- createMessageAck(messageId, qos, internal);
+ createMessageAck(packetId, qos, internal);
}
}