This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 019fc86138 ARTEMIS-4542 improve MQTT state storage
019fc86138 is described below
commit 019fc86138239cc34e84dd4c90326e6615e75e6b
Author: Justin Bertram <[email protected]>
AuthorDate: Thu Dec 21 11:32:28 2023 -0600
ARTEMIS-4542 improve MQTT state storage
This commit:
- Eliminates MQTT session storage on every successful connection.
Instead data is only written when subsriptions are created or
destroyed.
- Adds a configuration property for the storage timeout.
- Updates the documentation with relevant information.
- Refactors a few bits of code to eliminate unnecessary variables, etc.
---
.../api/config/ActiveMQDefaultConfiguration.java | 10 +++++++++
.../artemis/core/protocol/mqtt/MQTTLogger.java | 3 +++
.../protocol/mqtt/MQTTProtocolManagerFactory.java | 16 +++++----------
.../core/protocol/mqtt/MQTTSessionState.java | 11 +++-------
.../core/protocol/mqtt/MQTTStateManager.java | 24 ++++++++++------------
.../artemis/core/protocol/mqtt/StateSerDeTest.java | 4 ++--
.../artemis/core/config/Configuration.java | 17 +++++++++++++--
.../core/config/impl/ConfigurationImpl.java | 13 ++++++++++++
.../deployers/impl/FileConfigurationParser.java | 2 ++
.../resources/schema/artemis-configuration.xsd | 8 ++++++++
.../core/config/impl/FileConfigurationTest.java | 1 +
.../resources/ConfigurationTest-full-config.xml | 1 +
.../ConfigurationTest-xinclude-config.xml | 1 +
.../ConfigurationTest-xinclude-schema-config.xml | 1 +
docs/user-manual/mqtt.adoc | 24 +++++++++++++++++++++-
15 files changed, 99 insertions(+), 37 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 05c91889a7..1a6a8f47e7 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -679,6 +679,9 @@ public final class ActiveMQDefaultConfiguration {
// How often (in ms) to scan for expired MQTT sessions
private static long DEFAULT_MQTT_SESSION_SCAN_INTERVAL = 500;
+ // How long (in ms) to wait to persist MQTT session state
+ private static long DEFAULT_MQTT_SESSION_STATE_PERSISTENCE_TIMEOUT = 5000;
+
// If SESSION-notifications should be suppressed or not
public static boolean DEFAULT_SUPPRESS_SESSION_NOTIFICATIONS = false;
@@ -1869,6 +1872,13 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_MQTT_SESSION_SCAN_INTERVAL;
}
+ /**
+ * How long (in ms) to wait to persist MQTT session state
+ */
+ public static long getMqttSessionStatePersistenceTimeout() {
+ return DEFAULT_MQTT_SESSION_STATE_PERSISTENCE_TIMEOUT;
+ }
+
public static boolean getDefaultSuppressSessionNotifications() {
return DEFAULT_SUPPRESS_SESSION_NOTIFICATIONS;
}
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
index ca4cb9f7d3..c15bdac7d8 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
@@ -64,4 +64,7 @@ public interface MQTTLogger {
@LogMessage(id = 834009, value = "Ignoring duplicate MQTT QoS2 PUBLISH
packet for packet ID {} from client with ID {}.", level = LogMessage.Level.WARN)
void ignoringQoS2Publish(String clientId, long packetId);
+
+ @LogMessage(id = 834010, value = "Unable to scan MQTT sessions", level =
LogMessage.Level.ERROR)
+ void unableToScanSessions(Exception e);
}
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
index 2264dbf3d5..9d65b1de8d 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
@@ -22,7 +22,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
-import org.apache.activemq.artemis.core.protocol.ProtocolHandler;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -79,16 +78,11 @@ public class MQTTProtocolManagerFactory extends
AbstractProtocolManagerFactory<M
}
@Override
public void run() {
- server.getRemotingService().getAcceptors().forEach((key, acceptor) ->
{
- ProtocolHandler protocolHandler = acceptor.getProtocolHandler();
- if (protocolHandler != null) {
- protocolHandler.getProtocolMap().values().forEach(m -> {
- if (m instanceof MQTTProtocolManager) {
- ((MQTTProtocolManager)m).getStateManager().scanSessions();
- }
- });
- }
- });
+ try {
+ MQTTStateManager.getInstance(server).scanSessions();
+ } catch (Exception e) {
+ MQTTLogger.LOGGER.unableToScanSessions(e);
+ }
}
}
}
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
index 3cc08dcf63..2570cabcd6 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
@@ -47,14 +47,12 @@ public class MQTTSessionState {
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- public static final MQTTSessionState DEFAULT = new
MQTTSessionState((String) null, null);
+ public static final MQTTSessionState DEFAULT = new
MQTTSessionState((String) null);
private MQTTSession session;
private final String clientId;
- private final MQTTStateManager stateManager;
-
private final ConcurrentMap<String, Pair<MqttTopicSubscription, Integer>>
subscriptions = new ConcurrentHashMap<>();
// Used to store Packet ID of Publish QoS1 and QoS2 message. See spec:
4.3.3 QoS 2: Exactly once delivery. Method B.
@@ -98,9 +96,8 @@ public class MQTTSessionState {
private Map<String, Integer> serverTopicAliases;
- public MQTTSessionState(String clientId, MQTTStateManager stateManager) {
+ public MQTTSessionState(String clientId) {
this.clientId = clientId;
- this.stateManager = stateManager;
}
/**
@@ -119,12 +116,10 @@ public class MQTTSessionState {
* - int (nullable): subscription identifier
*
* @param message the message holding the MQTT session data
- * @param stateManager the manager used to add and remove sessions from
storage
*/
- public MQTTSessionState(CoreMessage message, MQTTStateManager stateManager)
{
+ public MQTTSessionState(CoreMessage message) {
logger.debug("Deserializing MQTT session state from {}", message);
this.clientId = message.getStringProperty(Message.HDR_LAST_VALUE_NAME);
- this.stateManager = stateManager;
ActiveMQBuffer buf = message.getDataBuffer();
// no need to use the version at this point
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
index 89705f067c..603fa1e1c7 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
@@ -54,6 +54,7 @@ public class MQTTStateManager {
private final Queue sessionStore;
private static Map<Integer, MQTTStateManager> INSTANCES = new HashMap<>();
private final Map<String, MQTTConnection> connectedClients = new
ConcurrentHashMap<>();
+ private final long timeout;
/*
* Even though there may be multiple instances of MQTTProtocolManager (e.g.
for MQTT on different ports) we only want
@@ -76,20 +77,19 @@ public class MQTTStateManager {
private MQTTStateManager(ActiveMQServer server) throws Exception {
this.server = server;
- sessionStore = server.createQueue(new
QueueConfiguration(MQTTUtil.MQTT_SESSION_STORE).setRoutingType(RoutingType.ANYCAST).setLastValue(true).setDurable(true).setInternal(true).setAutoCreateAddress(true),
true);
+ this.timeout =
server.getConfiguration().getMqttSessionStatePersistenceTimeout();
+ this.sessionStore = server.createQueue(new
QueueConfiguration(MQTTUtil.MQTT_SESSION_STORE).setRoutingType(RoutingType.ANYCAST).setLastValue(true).setDurable(true).setInternal(true).setAutoCreateAddress(true),
true);
// load session data from queue
try (LinkedListIterator<MessageReference> iterator =
sessionStore.browserIterator()) {
- try {
- while (iterator.hasNext()) {
- MessageReference ref = iterator.next();
- String clientId =
ref.getMessage().getStringProperty(Message.HDR_LAST_VALUE_NAME);
- MQTTSessionState sessionState = new
MQTTSessionState((CoreMessage) ref.getMessage(), this);
- sessionStates.put(clientId, sessionState);
- }
- } catch (NoSuchElementException ignored) {
- // this could happen through paging browsing
+ while (iterator.hasNext()) {
+ MessageReference ref = iterator.next();
+ String clientId =
ref.getMessage().getStringProperty(Message.HDR_LAST_VALUE_NAME);
+ MQTTSessionState sessionState = new MQTTSessionState((CoreMessage)
ref.getMessage());
+ sessionStates.put(clientId, sessionState);
}
+ } catch (NoSuchElementException ignored) {
+ // this could happen through paging browsing
}
}
@@ -127,10 +127,9 @@ public class MQTTStateManager {
if (sessionStates.containsKey(clientId)) {
return sessionStates.get(clientId);
} else {
- MQTTSessionState sessionState = new MQTTSessionState(clientId, this);
+ MQTTSessionState sessionState = new MQTTSessionState(clientId);
logger.debug("Adding MQTT session state for: {}", clientId);
sessionStates.put(clientId, sessionState);
- storeSessionState(sessionState);
return sessionState;
}
}
@@ -175,7 +174,6 @@ public class MQTTStateManager {
}
});
tx.commit();
- final long timeout = 5000;
if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
throw MQTTBundle.BUNDLE.unableToStoreMqttState(timeout);
}
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/StateSerDeTest.java
b/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/StateSerDeTest.java
index 11d2f025bf..51b6ad5845 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/StateSerDeTest.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/StateSerDeTest.java
@@ -34,7 +34,7 @@ public class StateSerDeTest {
public void testSerDe() throws Exception {
for (int i = 0; i < 500; i++) {
String clientId = RandomUtil.randomString();
- MQTTSessionState unserialized = new MQTTSessionState(clientId, null);
+ MQTTSessionState unserialized = new MQTTSessionState(clientId);
Integer subscriptionIdentifier = RandomUtil.randomPositiveIntOrNull();
for (int j = 0; j < RandomUtil.randomInterval(1, 50); j++) {
MqttTopicSubscription sub = new
MqttTopicSubscription(RandomUtil.randomString(),
@@ -46,7 +46,7 @@ public class StateSerDeTest {
}
CoreMessage serializedState =
MQTTStateManager.serializeState(unserialized, 0);
- MQTTSessionState deserialized = new MQTTSessionState(serializedState,
null);
+ MQTTSessionState deserialized = new MQTTSessionState(serializedState);
assertEquals(unserialized.getClientId(), deserialized.getClientId());
for (Pair<MqttTopicSubscription, Integer> unserializedEntry :
unserialized.getSubscriptionsPlusID()) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index 46259d940e..2ed4ebca82 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -1445,8 +1445,8 @@ public interface Configuration {
Configuration setTemporaryQueueNamespace(String temporaryQueueNamespace);
/**
- * This is specific to MQTT, and it's necessary because the session scan
interval is a broker-wide setting and can't
- * be set on a per-connector basis like the rest of the MQTT-specific
settings.
+ * This is necessary because the MQTT session scan interval is a
broker-wide setting and can't be set on a
+ * per-connector basis like most of the other MQTT-specific settings.
*/
Configuration setMqttSessionScanInterval(long mqttSessionScanInterval);
@@ -1457,6 +1457,19 @@ public interface Configuration {
*/
long getMqttSessionScanInterval();
+ /**
+ * This is necessary because MQTT sessions and handled on a broker-wide
basis so this can't be set on a per-connector
+ * basis like most of the other MQTT-specific settings.
+ */
+ Configuration setMqttSessionStatePersistenceTimeout(long
mqttSessionStatePersistenceTimeout);
+
+ /**
+ * @see Configuration#setMqttSessionStatePersistenceTimeout(long)
+ *
+ * @return
+ */
+ long getMqttSessionStatePersistenceTimeout();
+
/**
* Returns whether suppression of session-notifications is enabled for this
server. <br>
* Default value is {@link
org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_SUPPRESS_SESSION_NOTIFICATIONS}.
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index 993717c45f..7212f061c9 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -423,6 +423,8 @@ public class ConfigurationImpl implements Configuration,
Serializable {
private long mqttSessionScanInterval =
ActiveMQDefaultConfiguration.getMqttSessionScanInterval();
+ private long mqttSessionStatePersistenceTimeout =
ActiveMQDefaultConfiguration.getMqttSessionStatePersistenceTimeout();
+
private boolean suppressSessionNotifications =
ActiveMQDefaultConfiguration.getDefaultSuppressSessionNotifications();
private String literalMatchMarkers =
ActiveMQDefaultConfiguration.getLiteralMatchMarkers();
@@ -3216,6 +3218,17 @@ public class ConfigurationImpl implements Configuration,
Serializable {
return this;
}
+ @Override
+ public long getMqttSessionStatePersistenceTimeout() {
+ return mqttSessionStatePersistenceTimeout;
+ }
+
+ @Override
+ public Configuration setMqttSessionStatePersistenceTimeout(long
mqttSessionStatePersistenceTimeout) {
+ this.mqttSessionStatePersistenceTimeout =
mqttSessionStatePersistenceTimeout;
+ return this;
+ }
+
@Override
public boolean isSuppressSessionNotifications() {
return suppressSessionNotifications;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 31acbac9db..7ec64918f4 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -491,6 +491,8 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
config.setMqttSessionScanInterval(getLong(e,
"mqtt-session-scan-interval", config.getMqttSessionScanInterval(), GT_ZERO));
+ config.setMqttSessionStatePersistenceTimeout(getLong(e,
"mqtt-session-state-persistence-timeout",
config.getMqttSessionStatePersistenceTimeout(), GT_ZERO));
+
long globalMaxSize = getTextBytesAsLongBytes(e, GLOBAL_MAX_SIZE, -1,
MINUS_ONE_OR_GT_ZERO);
if (globalMaxSize > 0) {
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 08230d6c2c..9f6a784f14 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -455,6 +455,14 @@
</xsd:annotation>
</xsd:element>
+ <xsd:element name="mqtt-session-state-persistence-timeout"
type="xsd:long" default="5000" maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ how long (in ms) to wait to persist MQTT session state
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
<xsd:element ref="connectors" maxOccurs="1" minOccurs="0"/>
<xsd:element ref="acceptors" maxOccurs="1" minOccurs="0"/>
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 7f33160b4a..5216ba7a47 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -179,6 +179,7 @@ public class FileConfigurationTest extends
ConfigurationImplTest {
Assert.assertEquals(true, conf.isPopulateValidatedUser());
Assert.assertEquals(false, conf.isRejectEmptyValidatedUser());
Assert.assertEquals(123456, conf.getMqttSessionScanInterval());
+ Assert.assertEquals(567890,
conf.getMqttSessionStatePersistenceTimeout());
Assert.assertEquals(98765, conf.getConnectionTtlCheckInterval());
Assert.assertEquals(1234567, conf.getConfigurationFileRefreshPeriod());
Assert.assertEquals("TEMP", conf.getTemporaryQueueNamespace());
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 71de3f68ed..03e3e96cf8 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -57,6 +57,7 @@
<populate-validated-user>true</populate-validated-user>
<reject-empty-validated-user>false</reject-empty-validated-user>
<mqtt-session-scan-interval>123456</mqtt-session-scan-interval>
+
<mqtt-session-state-persistence-timeout>567890</mqtt-session-state-persistence-timeout>
<connection-ttl-check-interval>98765</connection-ttl-check-interval>
<configuration-file-refresh-period>1234567</configuration-file-refresh-period>
<temporary-queue-namespace>TEMP</temporary-queue-namespace>
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
index 0898a89a4a..84bb539057 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
@@ -58,6 +58,7 @@
<populate-validated-user>true</populate-validated-user>
<reject-empty-validated-user>false</reject-empty-validated-user>
<mqtt-session-scan-interval>123456</mqtt-session-scan-interval>
+
<mqtt-session-state-persistence-timeout>567890</mqtt-session-state-persistence-timeout>
<connection-ttl-check-interval>98765</connection-ttl-check-interval>
<configuration-file-refresh-period>1234567</configuration-file-refresh-period>
<temporary-queue-namespace>TEMP</temporary-queue-namespace>
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
index be0641fc9a..1adab32879 100644
---
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
+++
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
@@ -58,6 +58,7 @@
<populate-validated-user>true</populate-validated-user>
<reject-empty-validated-user>false</reject-empty-validated-user>
<mqtt-session-scan-interval>123456</mqtt-session-scan-interval>
+
<mqtt-session-state-persistence-timeout>567890</mqtt-session-state-persistence-timeout>
<connection-ttl-check-interval>98765</connection-ttl-check-interval>
<configuration-file-refresh-period>1234567</configuration-file-refresh-period>
<temporary-queue-namespace>TEMP</temporary-queue-namespace>
diff --git a/docs/user-manual/mqtt.adoc b/docs/user-manual/mqtt.adoc
index 2704210459..0c02c47969 100644
--- a/docs/user-manual/mqtt.adoc
+++ b/docs/user-manual/mqtt.adoc
@@ -91,10 +91,32 @@ Payload logging is limited to avoid filling the logs with
potentially hundreds o
== Persistent Subscriptions
-The subscription information for MQTT sessions is stored in an internal queue
named `$sys.mqtt.sessions` and persisted to disk (assuming persistence is
enabled).
+The subscription information for MQTT sessions is stored in an internal queue
named `$sys.mqtt.sessions` and persisted to storage (assuming persistence is
enabled).
The information is durable so that MQTT subscribers can reconnect and resume
their subscriptions seamlessly after a broker restart, failure, etc.
When brokers are configured for high availability this information will be
available on the backup so even in the case of a broker fail-over subscribers
will be able to resume their subscriptions.
+While persistent subscriptions can be convenient they impose a performance
penalty since data must be written to storage.
+If you don't need the convenience (e.g. you always use clean sessions) and you
don't want the performance penalty then you can disable it by disabling
durability for the `$sys.mqtt.sessions` queue in `broker.xml`, e.g.:
+
+[,xml]
+----
+<addresses>
+ ...
+ <address name="$sys.mqtt.sessions">
+ <anycast>
+ <queue name="$sys.mqtt.sessions">
+ <durable>false</durable>
+ </queue>
+ </anycast>
+ </address>
+ ...
+</addresses>
+----
+
+The setting `mqtt-session-state-persistence-timeout` controls how long the
broker will wait for the data to be written to storage before throwing an error.
+It is measured in milliseconds.
+The default is `5000`.
+
== Custom Client ID Handling
The client ID used by an MQTT application is very important as it uniquely
identifies the application.