This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 055100751c ARTEMIS-4459 log when ignoring dupe MQTT QoS2 pub
055100751c is described below

commit 055100751cba548b9c52643c84f47ffcaf82b616
Author: Justin Bertram <[email protected]>
AuthorDate: Thu Oct 12 15:41:40 2023 -0500

    ARTEMIS-4459 log when ignoring dupe MQTT QoS2 pub
    
    In accordance with the QoS2 protocol outlined in the MQTT
    specification(s), once the broker receives a PUBLISH then any other
    PUBLISH it receives on that same session with the same packet ID must be
    ignored until the QoS2 protocol for that ID is completed.
    
    The broker does this, but it doesn't log anything so it's not clear when
    this is actually happening.
---
 .../activemq/artemis/core/protocol/mqtt/MQTTLogger.java     |  3 +++
 .../artemis/core/protocol/mqtt/MQTTPublishManager.java      | 13 ++++++++-----
 2 files changed, 11 insertions(+), 5 deletions(-)

diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
index 834dadba2c..ca4cb9f7d3 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
@@ -61,4 +61,7 @@ public interface MQTTLogger {
 
    @LogMessage(id = 834008, value = "Failed to remove session state for client 
with ID: {}", level = LogMessage.Level.ERROR)
    void failedToRemoveSessionState(String clientID, Exception e);
+
+   @LogMessage(id = 834009, value = "Ignoring duplicate MQTT QoS2 PUBLISH 
packet for packet ID {} from client with ID {}.", level = LogMessage.Level.WARN)
+   void ignoringQoS2Publish(String clientId, long packetId);
 }
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index 057e2a957b..eb31cccfb8 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -223,10 +223,11 @@ public class MQTTPublishManager {
          if (qos > 0) {
             serverMessage.setDurable(MQTTUtil.DURABLE_MESSAGES);
          }
-         int messageId = message.variableHeader().packetId();
-         if (qos < 2 || !state.getPubRec().contains(messageId)) {
+         int packetId = message.variableHeader().packetId();
+         boolean qos2PublishAlreadyReceived = 
state.getPubRec().contains(packetId);
+         if (qos < 2 || !qos2PublishAlreadyReceived) {
             if (qos == 2 && !internal)
-               state.getPubRec().add(messageId);
+               state.getPubRec().add(packetId);
 
             Transaction tx = session.getServerSession().newTransaction();
             try {
@@ -252,7 +253,7 @@ public class MQTTPublishManager {
                   throw e;
                }
                if (session.getVersion() == MQTTVersion.MQTT_5) {
-                  sendMessageAck(internal, qos, messageId, 
MQTTReasonCodes.NOT_AUTHORIZED);
+                  sendMessageAck(internal, qos, packetId, 
MQTTReasonCodes.NOT_AUTHORIZED);
                   return;
                } else if (session.getVersion() == MQTTVersion.MQTT_3_1_1) {
                   /*
@@ -287,9 +288,11 @@ public class MQTTPublishManager {
                tx.rollback();
                throw t;
             }
+         } else if (qos2PublishAlreadyReceived) {
+            MQTTLogger.LOGGER.ignoringQoS2Publish(state.getClientId(), 
packetId);
          }
 
-         createMessageAck(messageId, qos, internal);
+         createMessageAck(packetId, qos, internal);
       }
    }
 

Reply via email to