Evgeniy Devyatykh created ARTEMIS-5469:
------------------------------------------
Summary: MQTT5 flow control rules violation
Key: ARTEMIS-5469
URL: https://issues.apache.org/jira/browse/ARTEMIS-5469
Project: ActiveMQ Artemis
Issue Type: Bug
Components: MQTT
Reporter: Evgeniy Devyatykh
Hi!
According to MQTT5 protocol:
[https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html]
{quote}
h2. 4.4 Message delivery retry
When a Client reconnects with Clean Start set to 0 and a session is present,
both the Client and Server MUST resend any unacknowledged PUBLISH packets
(where QoS > 0) and PUBREL packets using their original Packet Identifiers.
This is the only circumstance where a Client or Server is REQUIRED to resend
messages. Clients and Servers MUST NOT resend messages at any other time
[MQTT-4.4.0-1].
4.9 Flow Control
Clients and Servers control the number of unacknowledged PUBLISH packets they
receive by using a Receive Maximum value
The Client or Server MUST set its initial send quota to a non-zero value not
exceeding the Receive Maximum [MQTT-4.9.0-1].
Each time the Client or Server sends a PUBLISH packet at QoS > 0, it decrements
the send quota. If the send quota reaches zero, the Client or Server MUST NOT
send any more PUBLISH packets with QoS > 0 [MQTT-4.9.0-2]. It MAY continue to
send PUBLISH packets with QoS 0, or it MAY choose to suspend sending these as
well. The Client and Server MUST continue to process and respond to all other
MQTT Control Packets even if the quota is zero [MQTT-4.9.0-3].
The send quota is incremented by 1:
· Each time a PUBACK or PUBCOMP packet is received, regardless of whether the
PUBACK or PUBCOMP carried an error code.
· Each time a PUBREC packet is received with a Return Code of 0x80 or greater.
The send quota is not incremented if it is already equal to the initial send
quota. The attempt to increment above the initial send quota might be caused by
the re-transmission of a PUBREL packet after a new Network Connection is
established.
*The send quota and Receive Maximum value are not preserved across Network
Connections, and are re-initialized with each new Network Connection as
described above. They are not part of the session state.*
{quote}
But
{code:java}
org.apache.activemq.artemis.core.protocol.mqtt.MQTTSessionCallback
public boolean hasCredits(ServerConsumer consumerID, MessageReference ref) {
/*
* [MQTT-3.3.4-9] The Server MUST NOT send more than Receive Maximum QoS 1
and QoS 2 PUBLISH packets for which it
* has not received PUBACK, PUBCOMP, or PUBREC with a Reason Code of 128 or
greater from the Client.
*
* Therefore, enforce flow-control based on the number of pending QoS 1 & 2
messages
*/
if (ref != null && ref.isDurable() == true && connection.getReceiveMaximum()
!= -1 && session.getState().getOutboundStore().getPendingMessages() >=
connection.getReceiveMaximum()) {
return false;
} else {
return true;
}
}{code}
uses session object OutboundStore to determine current pending messages so if
- the client connects with cleanStart=0, session expiry interval>0 and receive
maximum = 1
- the broker send PUBLISH and store pending ID to OutboundStore
- the client disconnects without PUBACK
- the client reconnects with cleanStart=0, session expiry interval>0 and
receive maximum = 1
- the broker will not retry delivery of pending message due to hasCredist==false
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact