This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 0327d06d84 ARTEMIS-5493 MQTT max in-flight messages setting
     new 4d7c2f7a97 This closes #5706
0327d06d84 is described below

commit 0327d06d84f2cc0b3adda852be1fd06158b6ac21
Author: Evgeniy Devyatykh <[email protected]>
AuthorDate: Fri May 23 12:31:17 2025 +0500

    ARTEMIS-5493 MQTT max in-flight messages setting
---
 .../core/protocol/mqtt/MQTTProtocolManager.java    | 10 +++
 .../artemis/core/protocol/mqtt/MQTTSession.java    |  2 +-
 .../core/protocol/mqtt/MQTTSessionCallback.java    |  7 +-
 .../artemis/core/protocol/mqtt/MQTTUtil.java       |  2 +
 docs/user-manual/mqtt.adoc                         |  6 +-
 .../mqtt/MQTTMaxInFlightPublishesTest.java         | 90 ++++++++++++++++++++++
 .../mqtt5/spec/controlpackets/PublishTests.java    | 26 ++++++-
 7 files changed, 137 insertions(+), 6 deletions(-)

diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
index bae4cd77a5..9411e10d69 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
@@ -72,6 +72,8 @@ public class MQTTProtocolManager extends 
AbstractProtocolManager<MqttMessage, MQ
 
    private boolean allowLinkStealing = true;
 
+   private int defaultMaximumInFlightPublishMessages = 
MQTTUtil.DEFAULT_MAXIMUM_IN_FLIGHT_PUBLISH_MESSAGES;
+
    private final MQTTRoutingHandler routingHandler;
 
    private MQTTStateManager sessionStateManager;
@@ -154,6 +156,14 @@ public class MQTTProtocolManager extends 
AbstractProtocolManager<MqttMessage, MQ
       this.allowLinkStealing = allowLinkStealing;
    }
 
+   public int getDefaultMaximumInFlightPublishMessages() {
+      return defaultMaximumInFlightPublishMessages;
+   }
+
+   public void setDefaultMaximumInFlightPublishMessages(int 
defaultMaximumInFlightPublishMessages) {
+      this.defaultMaximumInFlightPublishMessages = 
defaultMaximumInFlightPublishMessages;
+   }
+
    @Override
    public void onNotification(Notification notification) {
       if (!(notification.getType() instanceof CoreNotificationType type))
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
index 1f8b9ecd4c..eadd91097b 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -89,7 +89,7 @@ public class MQTTSession {
 
       mqttConnectionManager = new MQTTConnectionManager(this);
       mqttPublishManager = new MQTTPublishManager(this, 
protocolManager.isCloseMqttConnectionOnPublishAuthorizationFailure());
-      sessionCallback = new MQTTSessionCallback(this, connection);
+      sessionCallback = new MQTTSessionCallback(this, connection, 
protocolManager.getDefaultMaximumInFlightPublishMessages());
       subscriptionManager = new MQTTSubscriptionManager(this, stateManager);
       retainMessageManager = new MQTTRetainMessageManager(this);
 
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index 7b4b03aea0..9b80d5b66f 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -26,10 +26,12 @@ public class MQTTSessionCallback implements SessionCallback 
{
 
    private final MQTTSession session;
    private final MQTTConnection connection;
+   private final int defaultMaximumInFlightPublishMessages;
 
-   public MQTTSessionCallback(MQTTSession session, MQTTConnection connection) 
throws Exception {
+   public MQTTSessionCallback(MQTTSession session, MQTTConnection connection, 
int defaultMaximumInFlightPublishMessages) throws Exception {
       this.session = session;
       this.connection = connection;
+      this.defaultMaximumInFlightPublishMessages = 
defaultMaximumInFlightPublishMessages;
    }
 
    @Override
@@ -107,7 +109,8 @@ public class MQTTSessionCallback implements SessionCallback 
{
        *
        * Therefore, enforce flow-control based on the number of pending QoS 1 
& 2 messages
        */
-      if (ref != null && ref.isDurable() == true && 
connection.getReceiveMaximum() != -1 && 
session.getState().getOutboundStore().getSendQuota() >= 
connection.getReceiveMaximum()) {
+      int maxInFlightPublishMessages = connection.getReceiveMaximum() > 0 ? 
connection.getReceiveMaximum() : defaultMaximumInFlightPublishMessages;
+      if (ref != null && ref.isDurable() == true && maxInFlightPublishMessages 
> 0 && session.getState().getOutboundStore().getSendQuota() >= 
maxInFlightPublishMessages) {
          return false;
       } else {
          return true;
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index eb6dcafa17..df1e640be7 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -139,6 +139,8 @@ public class MQTTUtil {
 
    public static final int DEFAULT_MAXIMUM_PACKET_SIZE = MAX_PACKET_SIZE;
 
+   public static final int DEFAULT_MAXIMUM_IN_FLIGHT_PUBLISH_MESSAGES = 
TWO_BYTE_INT_MAX;
+
    public static final WildcardConfiguration MQTT_WILDCARD = new 
WildcardConfiguration().setDelimiter(SLASH).setAnyWords(HASH).setSingleWord(PLUS);
 
    /**
diff --git a/docs/user-manual/mqtt.adoc b/docs/user-manual/mqtt.adoc
index 62b7e0e5e2..59f94a50a3 100644
--- a/docs/user-manual/mqtt.adoc
+++ b/docs/user-manual/mqtt.adoc
@@ -217,10 +217,12 @@ This can be changed using the 
`mqtt-session-scan-interval` element set in the `c
 
 == Flow Control
 
+MQTT 3.x lacks a flow control mechanism. The sending party determines how many 
QoS 1 & 2 messages can be published without acknowledgment. On the broker side, 
this is controlled by the `defaultMaximumInFlightPublishMessages` URL parameter 
on the MQTT `acceptor` in `broker.xml`, which defaults to `65535`.
+
 MQTT 5 introduced a simple form of 
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Flow_Control[flow
 control].
 In short, a broker can tell a client how many QoS 1 & 2 messages it can 
receive before being acknowledged and vice versa.
 
-This is controlled on the broker by setting the `receiveMaximum` URL parameter 
on the MQTT `acceptor` in `broker.xml`.
+This is controlled on the broker by setting the `receiveMaximum` URL parameter 
on the MQTT `acceptor`.
 
 The default value is `65535` (the maximum value of the 2-byte integer used by  
MQTT).
 
@@ -229,6 +231,8 @@ A value of `0` is prohibited by the MQTT 5 specification.
 A value of `-1` will prevent the broker from informing the client of any 
receive maximum which means flow-control will be disabled from clients to the 
broker.
 This is effectively the same as setting the value to `65535`, but reduces the 
size of the `CONNACK` packet by a few bytes.
 
+If the MQTT 5 client doesn't send the 
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901049[Receive
 Maximum] property to the broker, the broker uses its 
`defaultMaximumInFlightPublishMessages` setting to determine the maximum number 
of QoS 1 & 2 messages it can send without acknowledgment.
+
 == Topic Alias Maximum
 
 MQTT 5 introduced 
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Topic_Alias[topic
 aliasing].
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTMaxInFlightPublishesTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTMaxInFlightPublishesTest.java
new file mode 100644
index 0000000000..136ab7d5d9
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTMaxInFlightPublishesTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.mqtt;
+
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.Message;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class MQTTMaxInFlightPublishesTest extends MQTTTestSupport {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   @Test
+   @Timeout(60)
+   public void testCustomDefaultMaximumInFlightPublishMessages() throws 
Exception {
+      final MQTT subscriberClient = createMQTTConnection("MQTT-Sub-Client", 
false);
+      final MQTT publisherClient = createMQTTConnection("MQTT-Pub-Client", 
false);
+
+      BlockingConnection subscriberConn = 
subscriberClient.blockingConnection();
+      subscriberConn.connect();
+
+      String topic = "TopicA";
+      subscriberConn.subscribe(new Topic[]{new Topic(topic, 
QoS.AT_LEAST_ONCE)});
+
+      BlockingConnection publisherConn = publisherClient.blockingConnection();
+      publisherConn.connect();
+
+      publisherConn.publish(topic, new byte[]{1}, QoS.AT_LEAST_ONCE, false);
+      publisherConn.publish(topic, new byte[]{2}, QoS.AT_LEAST_ONCE, false);
+      publisherConn.publish(topic, new byte[]{3}, QoS.AT_LEAST_ONCE, false);
+      publisherConn.publish(topic, new byte[]{4}, QoS.AT_LEAST_ONCE, false);
+
+      Message msg1 = subscriberConn.receive();
+      assertEquals(1, msg1.getPayload()[0]);
+      Message msg2 = subscriberConn.receive();
+      assertEquals(2, msg2.getPayload()[0]);
+
+      Message msg3 = subscriberConn.receive(100, TimeUnit.MILLISECONDS);
+      assertNull(msg3);
+
+      msg1.ack();
+      msg3 = subscriberConn.receive();
+      assertEquals(3, msg3.getPayload()[0]);
+
+      Message msg4 = subscriberConn.receive(100, TimeUnit.MILLISECONDS);
+      assertNull(msg4);
+
+      msg3.ack();
+      msg4 = subscriberConn.receive();
+      assertEquals(4, msg4.getPayload()[0]);
+
+
+      subscriberConn.disconnect();
+      publisherConn.disconnect();
+
+   }
+
+   @Override
+   protected void addMQTTConnector() throws Exception {
+      server.getConfiguration().addAcceptorConfiguration("MQTT", 
"tcp://localhost:" + port + 
"?protocols=MQTT;anycastPrefix=anycast:;multicastPrefix=multicast:;defaultMaximumInFlightPublishMessages=2");
+
+      logger.debug("Added MQTT connector to broker");
+   }
+}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
index fcb6f02d7d..840c99b7d9 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
@@ -1602,7 +1602,20 @@ public class PublishTests extends MQTT5TestSupport {
     */
    @Test
    @Timeout(DEFAULT_TIMEOUT_SEC)
-   public void testReceiveMaximum() throws Exception {
+   public void testReceiveMaximumSetByClient() throws Exception {
+      testReceiveMaximum(false);
+   }
+
+   /**
+    * @see PublishTests#testReceiveMaximumSetByClient()
+    */
+   @Test
+   @Timeout(DEFAULT_TIMEOUT_SEC)
+   public void 
testImplicitReceiveMaximumByDefaultMaximumInFlightPublishMessages() throws 
Exception {
+      testReceiveMaximum(true);
+   }
+
+   private void testReceiveMaximum(boolean useDefault) throws Exception {
       AtomicInteger count = new AtomicInteger(0);
       AtomicBoolean failed = new AtomicBoolean(false);
       final int MESSAGE_COUNT = 50;
@@ -1625,13 +1638,21 @@ public class PublishTests extends MQTT5TestSupport {
       server.getRemotingService().addIncomingInterceptor(incomingInterceptor);
       server.getRemotingService().addOutgoingInterceptor(outgoingInterceptor);
 
+      if (useDefault) {
+         
getProtocolManager().setDefaultMaximumInFlightPublishMessages(RECEIVE_MAXIMUM); 
// must be used when missing Receive Maximum from the client
+      } else {
+         getProtocolManager().setDefaultMaximumInFlightPublishMessages(1); // 
Receive Maximum from the client must override this
+      }
+
       final String TOPIC = this.getTopicName();
 
       final CountDownLatch latch = new CountDownLatch(MESSAGE_COUNT);
       final String CONSUMER_ID = "consumer";
       MqttAsyncClient consumer = createAsyncPahoClient(CONSUMER_ID);
       MqttConnectionOptions options = new MqttConnectionOptions();
-      options.setReceiveMaximum(RECEIVE_MAXIMUM);
+      if (!useDefault) {
+         options.setReceiveMaximum(RECEIVE_MAXIMUM);
+      }
       consumer.connect(options).waitForCompletion();
       consumer.setCallback(new DefaultMqttCallback() {
          @Override
@@ -1690,6 +1711,7 @@ public class PublishTests extends MQTT5TestSupport {
       };
       server.getRemotingService().addIncomingInterceptor(incomingInterceptor);
       server.getRemotingService().addOutgoingInterceptor(outgoingInterceptor);
+      getProtocolManager().setDefaultMaximumInFlightPublishMessages(1); // 
must not be taken into account for QoS 0
 
       final CountDownLatch latch = new CountDownLatch(MESSAGE_COUNT);
       final String CONSUMER_ID = "consumer";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to