This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 019fc86138 ARTEMIS-4542 improve MQTT state storage
019fc86138 is described below

commit 019fc86138239cc34e84dd4c90326e6615e75e6b
Author: Justin Bertram <[email protected]>
AuthorDate: Thu Dec 21 11:32:28 2023 -0600

    ARTEMIS-4542 improve MQTT state storage
    
    This commit:
    
     - Eliminates MQTT session storage on every successful connection.
       Instead data is only written when subsriptions are created or
       destroyed.
     - Adds a configuration property for the storage timeout.
     - Updates the documentation with relevant information.
     - Refactors a few bits of code to eliminate unnecessary variables, etc.
---
 .../api/config/ActiveMQDefaultConfiguration.java   | 10 +++++++++
 .../artemis/core/protocol/mqtt/MQTTLogger.java     |  3 +++
 .../protocol/mqtt/MQTTProtocolManagerFactory.java  | 16 +++++----------
 .../core/protocol/mqtt/MQTTSessionState.java       | 11 +++-------
 .../core/protocol/mqtt/MQTTStateManager.java       | 24 ++++++++++------------
 .../artemis/core/protocol/mqtt/StateSerDeTest.java |  4 ++--
 .../artemis/core/config/Configuration.java         | 17 +++++++++++++--
 .../core/config/impl/ConfigurationImpl.java        | 13 ++++++++++++
 .../deployers/impl/FileConfigurationParser.java    |  2 ++
 .../resources/schema/artemis-configuration.xsd     |  8 ++++++++
 .../core/config/impl/FileConfigurationTest.java    |  1 +
 .../resources/ConfigurationTest-full-config.xml    |  1 +
 .../ConfigurationTest-xinclude-config.xml          |  1 +
 .../ConfigurationTest-xinclude-schema-config.xml   |  1 +
 docs/user-manual/mqtt.adoc                         | 24 +++++++++++++++++++++-
 15 files changed, 99 insertions(+), 37 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 05c91889a7..1a6a8f47e7 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -679,6 +679,9 @@ public final class ActiveMQDefaultConfiguration {
    // How often (in ms) to scan for expired MQTT sessions
    private static long DEFAULT_MQTT_SESSION_SCAN_INTERVAL = 500;
 
+   // How long (in ms) to wait to persist MQTT session state
+   private static long DEFAULT_MQTT_SESSION_STATE_PERSISTENCE_TIMEOUT = 5000;
+
    // If SESSION-notifications should be suppressed or not
    public static boolean DEFAULT_SUPPRESS_SESSION_NOTIFICATIONS = false;
 
@@ -1869,6 +1872,13 @@ public final class ActiveMQDefaultConfiguration {
       return DEFAULT_MQTT_SESSION_SCAN_INTERVAL;
    }
 
+   /**
+    * How long (in ms) to wait to persist MQTT session state
+    */
+   public static long getMqttSessionStatePersistenceTimeout() {
+      return DEFAULT_MQTT_SESSION_STATE_PERSISTENCE_TIMEOUT;
+   }
+
    public static boolean getDefaultSuppressSessionNotifications() {
       return DEFAULT_SUPPRESS_SESSION_NOTIFICATIONS;
    }
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
index ca4cb9f7d3..c15bdac7d8 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
@@ -64,4 +64,7 @@ public interface MQTTLogger {
 
    @LogMessage(id = 834009, value = "Ignoring duplicate MQTT QoS2 PUBLISH 
packet for packet ID {} from client with ID {}.", level = LogMessage.Level.WARN)
    void ignoringQoS2Publish(String clientId, long packetId);
+
+   @LogMessage(id = 834010, value = "Unable to scan MQTT sessions", level = 
LogMessage.Level.ERROR)
+   void unableToScanSessions(Exception e);
 }
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
index 2264dbf3d5..9d65b1de8d 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
@@ -22,7 +22,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
-import org.apache.activemq.artemis.core.protocol.ProtocolHandler;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -79,16 +78,11 @@ public class MQTTProtocolManagerFactory extends 
AbstractProtocolManagerFactory<M
       }
       @Override
       public void run() {
-         server.getRemotingService().getAcceptors().forEach((key, acceptor) -> 
{
-            ProtocolHandler protocolHandler = acceptor.getProtocolHandler();
-            if (protocolHandler != null) {
-               protocolHandler.getProtocolMap().values().forEach(m -> {
-                  if (m instanceof MQTTProtocolManager) {
-                     ((MQTTProtocolManager)m).getStateManager().scanSessions();
-                  }
-               });
-            }
-         });
+         try {
+            MQTTStateManager.getInstance(server).scanSessions();
+         } catch (Exception e) {
+            MQTTLogger.LOGGER.unableToScanSessions(e);
+         }
       }
    }
 }
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
index 3cc08dcf63..2570cabcd6 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
@@ -47,14 +47,12 @@ public class MQTTSessionState {
 
    private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-   public static final MQTTSessionState DEFAULT = new 
MQTTSessionState((String) null, null);
+   public static final MQTTSessionState DEFAULT = new 
MQTTSessionState((String) null);
 
    private MQTTSession session;
 
    private final String clientId;
 
-   private final MQTTStateManager stateManager;
-
    private final ConcurrentMap<String, Pair<MqttTopicSubscription, Integer>> 
subscriptions = new ConcurrentHashMap<>();
 
    // Used to store Packet ID of Publish QoS1 and QoS2 message.  See spec: 
4.3.3 QoS 2: Exactly once delivery.  Method B.
@@ -98,9 +96,8 @@ public class MQTTSessionState {
 
    private Map<String, Integer> serverTopicAliases;
 
-   public MQTTSessionState(String clientId, MQTTStateManager stateManager) {
+   public MQTTSessionState(String clientId) {
       this.clientId = clientId;
-      this.stateManager = stateManager;
    }
 
    /**
@@ -119,12 +116,10 @@ public class MQTTSessionState {
     *  - int (nullable): subscription identifier
     *
     * @param message the message holding the MQTT session data
-    * @param stateManager the manager used to add and remove sessions from 
storage
     */
-   public MQTTSessionState(CoreMessage message, MQTTStateManager stateManager) 
{
+   public MQTTSessionState(CoreMessage message) {
       logger.debug("Deserializing MQTT session state from {}", message);
       this.clientId = message.getStringProperty(Message.HDR_LAST_VALUE_NAME);
-      this.stateManager = stateManager;
       ActiveMQBuffer buf = message.getDataBuffer();
 
       // no need to use the version at this point
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
index 89705f067c..603fa1e1c7 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
@@ -54,6 +54,7 @@ public class MQTTStateManager {
    private final Queue sessionStore;
    private static Map<Integer, MQTTStateManager> INSTANCES = new HashMap<>();
    private final Map<String, MQTTConnection> connectedClients  = new 
ConcurrentHashMap<>();
+   private final long timeout;
 
    /*
     * Even though there may be multiple instances of MQTTProtocolManager (e.g. 
for MQTT on different ports) we only want
@@ -76,20 +77,19 @@ public class MQTTStateManager {
 
    private MQTTStateManager(ActiveMQServer server) throws Exception {
       this.server = server;
-      sessionStore = server.createQueue(new 
QueueConfiguration(MQTTUtil.MQTT_SESSION_STORE).setRoutingType(RoutingType.ANYCAST).setLastValue(true).setDurable(true).setInternal(true).setAutoCreateAddress(true),
 true);
+      this.timeout = 
server.getConfiguration().getMqttSessionStatePersistenceTimeout();
+      this.sessionStore = server.createQueue(new 
QueueConfiguration(MQTTUtil.MQTT_SESSION_STORE).setRoutingType(RoutingType.ANYCAST).setLastValue(true).setDurable(true).setInternal(true).setAutoCreateAddress(true),
 true);
 
       // load session data from queue
       try (LinkedListIterator<MessageReference> iterator = 
sessionStore.browserIterator()) {
-         try {
-            while (iterator.hasNext()) {
-               MessageReference ref = iterator.next();
-               String clientId = 
ref.getMessage().getStringProperty(Message.HDR_LAST_VALUE_NAME);
-               MQTTSessionState sessionState = new 
MQTTSessionState((CoreMessage) ref.getMessage(), this);
-               sessionStates.put(clientId, sessionState);
-            }
-         } catch (NoSuchElementException ignored) {
-            // this could happen through paging browsing
+         while (iterator.hasNext()) {
+            MessageReference ref = iterator.next();
+            String clientId = 
ref.getMessage().getStringProperty(Message.HDR_LAST_VALUE_NAME);
+            MQTTSessionState sessionState = new MQTTSessionState((CoreMessage) 
ref.getMessage());
+            sessionStates.put(clientId, sessionState);
          }
+      } catch (NoSuchElementException ignored) {
+         // this could happen through paging browsing
       }
    }
 
@@ -127,10 +127,9 @@ public class MQTTStateManager {
       if (sessionStates.containsKey(clientId)) {
          return sessionStates.get(clientId);
       } else {
-         MQTTSessionState sessionState = new MQTTSessionState(clientId, this);
+         MQTTSessionState sessionState = new MQTTSessionState(clientId);
          logger.debug("Adding MQTT session state for: {}", clientId);
          sessionStates.put(clientId, sessionState);
-         storeSessionState(sessionState);
          return sessionState;
       }
    }
@@ -175,7 +174,6 @@ public class MQTTStateManager {
          }
       });
       tx.commit();
-      final long timeout = 5000;
       if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
          throw MQTTBundle.BUNDLE.unableToStoreMqttState(timeout);
       }
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/StateSerDeTest.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/StateSerDeTest.java
index 11d2f025bf..51b6ad5845 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/StateSerDeTest.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/StateSerDeTest.java
@@ -34,7 +34,7 @@ public class StateSerDeTest {
    public void testSerDe() throws Exception {
       for (int i = 0; i < 500; i++) {
          String clientId = RandomUtil.randomString();
-         MQTTSessionState unserialized = new MQTTSessionState(clientId, null);
+         MQTTSessionState unserialized = new MQTTSessionState(clientId);
          Integer subscriptionIdentifier = RandomUtil.randomPositiveIntOrNull();
          for (int j = 0; j < RandomUtil.randomInterval(1, 50); j++) {
             MqttTopicSubscription sub = new 
MqttTopicSubscription(RandomUtil.randomString(),
@@ -46,7 +46,7 @@ public class StateSerDeTest {
          }
 
          CoreMessage serializedState = 
MQTTStateManager.serializeState(unserialized, 0);
-         MQTTSessionState deserialized = new MQTTSessionState(serializedState, 
null);
+         MQTTSessionState deserialized = new MQTTSessionState(serializedState);
 
          assertEquals(unserialized.getClientId(), deserialized.getClientId());
          for (Pair<MqttTopicSubscription, Integer> unserializedEntry : 
unserialized.getSubscriptionsPlusID()) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index 46259d940e..2ed4ebca82 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -1445,8 +1445,8 @@ public interface Configuration {
    Configuration setTemporaryQueueNamespace(String temporaryQueueNamespace);
 
    /**
-    * This is specific to MQTT, and it's necessary because the session scan 
interval is a broker-wide setting and can't
-    * be set on a per-connector basis like the rest of the MQTT-specific 
settings.
+    * This is necessary because the MQTT session scan interval is a 
broker-wide setting and can't be set on a
+    * per-connector basis like most of the other MQTT-specific settings.
     */
    Configuration setMqttSessionScanInterval(long mqttSessionScanInterval);
 
@@ -1457,6 +1457,19 @@ public interface Configuration {
     */
    long getMqttSessionScanInterval();
 
+   /**
+    * This is necessary because MQTT sessions and handled on a broker-wide 
basis so this can't be set on a per-connector
+    * basis like most of the other MQTT-specific settings.
+    */
+   Configuration setMqttSessionStatePersistenceTimeout(long 
mqttSessionStatePersistenceTimeout);
+
+   /**
+    * @see Configuration#setMqttSessionStatePersistenceTimeout(long)
+    *
+    * @return
+    */
+   long getMqttSessionStatePersistenceTimeout();
+
    /**
     * Returns whether suppression of session-notifications is enabled for this 
server. <br>
     * Default value is {@link 
org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_SUPPRESS_SESSION_NOTIFICATIONS}.
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index 993717c45f..7212f061c9 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -423,6 +423,8 @@ public class ConfigurationImpl implements Configuration, 
Serializable {
 
    private long mqttSessionScanInterval = 
ActiveMQDefaultConfiguration.getMqttSessionScanInterval();
 
+   private long mqttSessionStatePersistenceTimeout = 
ActiveMQDefaultConfiguration.getMqttSessionStatePersistenceTimeout();
+
    private boolean suppressSessionNotifications = 
ActiveMQDefaultConfiguration.getDefaultSuppressSessionNotifications();
 
    private String literalMatchMarkers = 
ActiveMQDefaultConfiguration.getLiteralMatchMarkers();
@@ -3216,6 +3218,17 @@ public class ConfigurationImpl implements Configuration, 
Serializable {
       return this;
    }
 
+   @Override
+   public long getMqttSessionStatePersistenceTimeout() {
+      return mqttSessionStatePersistenceTimeout;
+   }
+
+   @Override
+   public Configuration setMqttSessionStatePersistenceTimeout(long 
mqttSessionStatePersistenceTimeout) {
+      this.mqttSessionStatePersistenceTimeout = 
mqttSessionStatePersistenceTimeout;
+      return this;
+   }
+
    @Override
    public boolean isSuppressSessionNotifications() {
       return suppressSessionNotifications;
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 31acbac9db..7ec64918f4 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -491,6 +491,8 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
 
       config.setMqttSessionScanInterval(getLong(e, 
"mqtt-session-scan-interval", config.getMqttSessionScanInterval(), GT_ZERO));
 
+      config.setMqttSessionStatePersistenceTimeout(getLong(e, 
"mqtt-session-state-persistence-timeout", 
config.getMqttSessionStatePersistenceTimeout(), GT_ZERO));
+
       long globalMaxSize = getTextBytesAsLongBytes(e, GLOBAL_MAX_SIZE, -1, 
MINUS_ONE_OR_GT_ZERO);
 
       if (globalMaxSize > 0) {
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd 
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 08230d6c2c..9f6a784f14 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -455,6 +455,14 @@
             </xsd:annotation>
          </xsd:element>
 
+         <xsd:element name="mqtt-session-state-persistence-timeout" 
type="xsd:long" default="5000" maxOccurs="1" minOccurs="0">
+            <xsd:annotation>
+               <xsd:documentation>
+                  how long (in ms) to wait to persist MQTT session state
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+
          <xsd:element ref="connectors" maxOccurs="1" minOccurs="0"/>
 
          <xsd:element ref="acceptors" maxOccurs="1" minOccurs="0"/>
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 7f33160b4a..5216ba7a47 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -179,6 +179,7 @@ public class FileConfigurationTest extends 
ConfigurationImplTest {
       Assert.assertEquals(true, conf.isPopulateValidatedUser());
       Assert.assertEquals(false, conf.isRejectEmptyValidatedUser());
       Assert.assertEquals(123456, conf.getMqttSessionScanInterval());
+      Assert.assertEquals(567890, 
conf.getMqttSessionStatePersistenceTimeout());
       Assert.assertEquals(98765, conf.getConnectionTtlCheckInterval());
       Assert.assertEquals(1234567, conf.getConfigurationFileRefreshPeriod());
       Assert.assertEquals("TEMP", conf.getTemporaryQueueNamespace());
diff --git 
a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml 
b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 71de3f68ed..03e3e96cf8 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -57,6 +57,7 @@
       <populate-validated-user>true</populate-validated-user>
       <reject-empty-validated-user>false</reject-empty-validated-user>
       <mqtt-session-scan-interval>123456</mqtt-session-scan-interval>
+      
<mqtt-session-state-persistence-timeout>567890</mqtt-session-state-persistence-timeout>
       <connection-ttl-check-interval>98765</connection-ttl-check-interval>
       
<configuration-file-refresh-period>1234567</configuration-file-refresh-period>
       <temporary-queue-namespace>TEMP</temporary-queue-namespace>
diff --git 
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml 
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
index 0898a89a4a..84bb539057 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
@@ -58,6 +58,7 @@
       <populate-validated-user>true</populate-validated-user>
       <reject-empty-validated-user>false</reject-empty-validated-user>
       <mqtt-session-scan-interval>123456</mqtt-session-scan-interval>
+      
<mqtt-session-state-persistence-timeout>567890</mqtt-session-state-persistence-timeout>
       <connection-ttl-check-interval>98765</connection-ttl-check-interval>
       
<configuration-file-refresh-period>1234567</configuration-file-refresh-period>
       <temporary-queue-namespace>TEMP</temporary-queue-namespace>
diff --git 
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
 
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
index be0641fc9a..1adab32879 100644
--- 
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
+++ 
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
@@ -58,6 +58,7 @@
       <populate-validated-user>true</populate-validated-user>
       <reject-empty-validated-user>false</reject-empty-validated-user>
       <mqtt-session-scan-interval>123456</mqtt-session-scan-interval>
+      
<mqtt-session-state-persistence-timeout>567890</mqtt-session-state-persistence-timeout>
       <connection-ttl-check-interval>98765</connection-ttl-check-interval>
       
<configuration-file-refresh-period>1234567</configuration-file-refresh-period>
       <temporary-queue-namespace>TEMP</temporary-queue-namespace>
diff --git a/docs/user-manual/mqtt.adoc b/docs/user-manual/mqtt.adoc
index 2704210459..0c02c47969 100644
--- a/docs/user-manual/mqtt.adoc
+++ b/docs/user-manual/mqtt.adoc
@@ -91,10 +91,32 @@ Payload logging is limited to avoid filling the logs with 
potentially hundreds o
 
 == Persistent Subscriptions
 
-The subscription information for MQTT sessions is stored in an internal queue 
named `$sys.mqtt.sessions` and persisted to disk (assuming persistence is 
enabled).
+The subscription information for MQTT sessions is stored in an internal queue 
named `$sys.mqtt.sessions` and persisted to storage (assuming persistence is 
enabled).
 The information is durable so that MQTT subscribers can reconnect and resume 
their subscriptions seamlessly after a broker restart, failure, etc.
 When brokers are configured for high availability this information will be 
available on the backup so even in the case of a broker fail-over subscribers 
will be able to resume their subscriptions.
 
+While persistent subscriptions can be convenient they impose a performance 
penalty since data must be written to storage.
+If you don't need the convenience (e.g. you always use clean sessions) and you 
don't want the performance penalty then you can disable it by disabling 
durability for the `$sys.mqtt.sessions` queue in `broker.xml`, e.g.:
+
+[,xml]
+----
+<addresses>
+   ...
+   <address name="$sys.mqtt.sessions">
+      <anycast>
+         <queue name="$sys.mqtt.sessions">
+            <durable>false</durable>
+         </queue>
+      </anycast>
+   </address>
+   ...
+</addresses>
+----
+
+The setting `mqtt-session-state-persistence-timeout` controls how long the 
broker will wait for the data to be written to storage before throwing an error.
+It is measured in milliseconds.
+The default is `5000`.
+
 == Custom Client ID Handling
 
 The client ID used by an MQTT application is very important as it uniquely 
identifies the application.

Reply via email to