This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 0327d06d84 ARTEMIS-5493 MQTT max in-flight messages setting
new 4d7c2f7a97 This closes #5706
0327d06d84 is described below
commit 0327d06d84f2cc0b3adda852be1fd06158b6ac21
Author: Evgeniy Devyatykh <[email protected]>
AuthorDate: Fri May 23 12:31:17 2025 +0500
ARTEMIS-5493 MQTT max in-flight messages setting
---
.../core/protocol/mqtt/MQTTProtocolManager.java | 10 +++
.../artemis/core/protocol/mqtt/MQTTSession.java | 2 +-
.../core/protocol/mqtt/MQTTSessionCallback.java | 7 +-
.../artemis/core/protocol/mqtt/MQTTUtil.java | 2 +
docs/user-manual/mqtt.adoc | 6 +-
.../mqtt/MQTTMaxInFlightPublishesTest.java | 90 ++++++++++++++++++++++
.../mqtt5/spec/controlpackets/PublishTests.java | 26 ++++++-
7 files changed, 137 insertions(+), 6 deletions(-)
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
index bae4cd77a5..9411e10d69 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
@@ -72,6 +72,8 @@ public class MQTTProtocolManager extends
AbstractProtocolManager<MqttMessage, MQ
private boolean allowLinkStealing = true;
+ private int defaultMaximumInFlightPublishMessages =
MQTTUtil.DEFAULT_MAXIMUM_IN_FLIGHT_PUBLISH_MESSAGES;
+
private final MQTTRoutingHandler routingHandler;
private MQTTStateManager sessionStateManager;
@@ -154,6 +156,14 @@ public class MQTTProtocolManager extends
AbstractProtocolManager<MqttMessage, MQ
this.allowLinkStealing = allowLinkStealing;
}
+ public int getDefaultMaximumInFlightPublishMessages() {
+ return defaultMaximumInFlightPublishMessages;
+ }
+
+ public void setDefaultMaximumInFlightPublishMessages(int
defaultMaximumInFlightPublishMessages) {
+ this.defaultMaximumInFlightPublishMessages =
defaultMaximumInFlightPublishMessages;
+ }
+
@Override
public void onNotification(Notification notification) {
if (!(notification.getType() instanceof CoreNotificationType type))
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
index 1f8b9ecd4c..eadd91097b 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -89,7 +89,7 @@ public class MQTTSession {
mqttConnectionManager = new MQTTConnectionManager(this);
mqttPublishManager = new MQTTPublishManager(this,
protocolManager.isCloseMqttConnectionOnPublishAuthorizationFailure());
- sessionCallback = new MQTTSessionCallback(this, connection);
+ sessionCallback = new MQTTSessionCallback(this, connection,
protocolManager.getDefaultMaximumInFlightPublishMessages());
subscriptionManager = new MQTTSubscriptionManager(this, stateManager);
retainMessageManager = new MQTTRetainMessageManager(this);
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index 7b4b03aea0..9b80d5b66f 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -26,10 +26,12 @@ public class MQTTSessionCallback implements SessionCallback
{
private final MQTTSession session;
private final MQTTConnection connection;
+ private final int defaultMaximumInFlightPublishMessages;
- public MQTTSessionCallback(MQTTSession session, MQTTConnection connection)
throws Exception {
+ public MQTTSessionCallback(MQTTSession session, MQTTConnection connection,
int defaultMaximumInFlightPublishMessages) throws Exception {
this.session = session;
this.connection = connection;
+ this.defaultMaximumInFlightPublishMessages =
defaultMaximumInFlightPublishMessages;
}
@Override
@@ -107,7 +109,8 @@ public class MQTTSessionCallback implements SessionCallback
{
*
* Therefore, enforce flow-control based on the number of pending QoS 1
& 2 messages
*/
- if (ref != null && ref.isDurable() == true &&
connection.getReceiveMaximum() != -1 &&
session.getState().getOutboundStore().getSendQuota() >=
connection.getReceiveMaximum()) {
+ int maxInFlightPublishMessages = connection.getReceiveMaximum() > 0 ?
connection.getReceiveMaximum() : defaultMaximumInFlightPublishMessages;
+ if (ref != null && ref.isDurable() == true && maxInFlightPublishMessages
> 0 && session.getState().getOutboundStore().getSendQuota() >=
maxInFlightPublishMessages) {
return false;
} else {
return true;
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index eb6dcafa17..df1e640be7 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -139,6 +139,8 @@ public class MQTTUtil {
public static final int DEFAULT_MAXIMUM_PACKET_SIZE = MAX_PACKET_SIZE;
+ public static final int DEFAULT_MAXIMUM_IN_FLIGHT_PUBLISH_MESSAGES =
TWO_BYTE_INT_MAX;
+
public static final WildcardConfiguration MQTT_WILDCARD = new
WildcardConfiguration().setDelimiter(SLASH).setAnyWords(HASH).setSingleWord(PLUS);
/**
diff --git a/docs/user-manual/mqtt.adoc b/docs/user-manual/mqtt.adoc
index 62b7e0e5e2..59f94a50a3 100644
--- a/docs/user-manual/mqtt.adoc
+++ b/docs/user-manual/mqtt.adoc
@@ -217,10 +217,12 @@ This can be changed using the
`mqtt-session-scan-interval` element set in the `c
== Flow Control
+MQTT 3.x lacks a flow control mechanism. The sending party determines how many
QoS 1 & 2 messages can be published without acknowledgment. On the broker side,
this is controlled by the `defaultMaximumInFlightPublishMessages` URL parameter
on the MQTT `acceptor` in `broker.xml`, which defaults to `65535`.
+
MQTT 5 introduced a simple form of
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Flow_Control[flow
control].
In short, a broker can tell a client how many QoS 1 & 2 messages it can
receive before being acknowledged and vice versa.
-This is controlled on the broker by setting the `receiveMaximum` URL parameter
on the MQTT `acceptor` in `broker.xml`.
+This is controlled on the broker by setting the `receiveMaximum` URL parameter
on the MQTT `acceptor`.
The default value is `65535` (the maximum value of the 2-byte integer used by
MQTT).
@@ -229,6 +231,8 @@ A value of `0` is prohibited by the MQTT 5 specification.
A value of `-1` will prevent the broker from informing the client of any
receive maximum which means flow-control will be disabled from clients to the
broker.
This is effectively the same as setting the value to `65535`, but reduces the
size of the `CONNACK` packet by a few bytes.
+If the MQTT 5 client doesn't send the
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901049[Receive
Maximum] property to the broker, the broker uses its
`defaultMaximumInFlightPublishMessages` setting to determine the maximum number
of QoS 1 & 2 messages it can send without acknowledgment.
+
== Topic Alias Maximum
MQTT 5 introduced
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Topic_Alias[topic
aliasing].
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTMaxInFlightPublishesTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTMaxInFlightPublishesTest.java
new file mode 100644
index 0000000000..136ab7d5d9
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTMaxInFlightPublishesTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.mqtt;
+
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.Message;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class MQTTMaxInFlightPublishesTest extends MQTTTestSupport {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ @Test
+ @Timeout(60)
+ public void testCustomDefaultMaximumInFlightPublishMessages() throws
Exception {
+ final MQTT subscriberClient = createMQTTConnection("MQTT-Sub-Client",
false);
+ final MQTT publisherClient = createMQTTConnection("MQTT-Pub-Client",
false);
+
+ BlockingConnection subscriberConn =
subscriberClient.blockingConnection();
+ subscriberConn.connect();
+
+ String topic = "TopicA";
+ subscriberConn.subscribe(new Topic[]{new Topic(topic,
QoS.AT_LEAST_ONCE)});
+
+ BlockingConnection publisherConn = publisherClient.blockingConnection();
+ publisherConn.connect();
+
+ publisherConn.publish(topic, new byte[]{1}, QoS.AT_LEAST_ONCE, false);
+ publisherConn.publish(topic, new byte[]{2}, QoS.AT_LEAST_ONCE, false);
+ publisherConn.publish(topic, new byte[]{3}, QoS.AT_LEAST_ONCE, false);
+ publisherConn.publish(topic, new byte[]{4}, QoS.AT_LEAST_ONCE, false);
+
+ Message msg1 = subscriberConn.receive();
+ assertEquals(1, msg1.getPayload()[0]);
+ Message msg2 = subscriberConn.receive();
+ assertEquals(2, msg2.getPayload()[0]);
+
+ Message msg3 = subscriberConn.receive(100, TimeUnit.MILLISECONDS);
+ assertNull(msg3);
+
+ msg1.ack();
+ msg3 = subscriberConn.receive();
+ assertEquals(3, msg3.getPayload()[0]);
+
+ Message msg4 = subscriberConn.receive(100, TimeUnit.MILLISECONDS);
+ assertNull(msg4);
+
+ msg3.ack();
+ msg4 = subscriberConn.receive();
+ assertEquals(4, msg4.getPayload()[0]);
+
+
+ subscriberConn.disconnect();
+ publisherConn.disconnect();
+
+ }
+
+ @Override
+ protected void addMQTTConnector() throws Exception {
+ server.getConfiguration().addAcceptorConfiguration("MQTT",
"tcp://localhost:" + port +
"?protocols=MQTT;anycastPrefix=anycast:;multicastPrefix=multicast:;defaultMaximumInFlightPublishMessages=2");
+
+ logger.debug("Added MQTT connector to broker");
+ }
+}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
index fcb6f02d7d..840c99b7d9 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
@@ -1602,7 +1602,20 @@ public class PublishTests extends MQTT5TestSupport {
*/
@Test
@Timeout(DEFAULT_TIMEOUT_SEC)
- public void testReceiveMaximum() throws Exception {
+ public void testReceiveMaximumSetByClient() throws Exception {
+ testReceiveMaximum(false);
+ }
+
+ /**
+ * @see PublishTests#testReceiveMaximumSetByClient()
+ */
+ @Test
+ @Timeout(DEFAULT_TIMEOUT_SEC)
+ public void
testImplicitReceiveMaximumByDefaultMaximumInFlightPublishMessages() throws
Exception {
+ testReceiveMaximum(true);
+ }
+
+ private void testReceiveMaximum(boolean useDefault) throws Exception {
AtomicInteger count = new AtomicInteger(0);
AtomicBoolean failed = new AtomicBoolean(false);
final int MESSAGE_COUNT = 50;
@@ -1625,13 +1638,21 @@ public class PublishTests extends MQTT5TestSupport {
server.getRemotingService().addIncomingInterceptor(incomingInterceptor);
server.getRemotingService().addOutgoingInterceptor(outgoingInterceptor);
+ if (useDefault) {
+
getProtocolManager().setDefaultMaximumInFlightPublishMessages(RECEIVE_MAXIMUM);
// must be used when missing Receive Maximum from the client
+ } else {
+ getProtocolManager().setDefaultMaximumInFlightPublishMessages(1); //
Receive Maximum from the client must override this
+ }
+
final String TOPIC = this.getTopicName();
final CountDownLatch latch = new CountDownLatch(MESSAGE_COUNT);
final String CONSUMER_ID = "consumer";
MqttAsyncClient consumer = createAsyncPahoClient(CONSUMER_ID);
MqttConnectionOptions options = new MqttConnectionOptions();
- options.setReceiveMaximum(RECEIVE_MAXIMUM);
+ if (!useDefault) {
+ options.setReceiveMaximum(RECEIVE_MAXIMUM);
+ }
consumer.connect(options).waitForCompletion();
consumer.setCallback(new DefaultMqttCallback() {
@Override
@@ -1690,6 +1711,7 @@ public class PublishTests extends MQTT5TestSupport {
};
server.getRemotingService().addIncomingInterceptor(incomingInterceptor);
server.getRemotingService().addOutgoingInterceptor(outgoingInterceptor);
+ getProtocolManager().setDefaultMaximumInFlightPublishMessages(1); //
must not be taken into account for QoS 0
final CountDownLatch latch = new CountDownLatch(MESSAGE_COUNT);
final String CONSUMER_ID = "consumer";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact