This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 5784bdee03 ARTEMIS-5499 refactor MQTT subscription persistence
5784bdee03 is described below
commit 5784bdee032f33bfef10909b376bdb53e1b74c90
Author: Justin Bertram <[email protected]>
AuthorDate: Wed May 28 15:56:51 2025 -0500
ARTEMIS-5499 refactor MQTT subscription persistence
This commit refactors how the broker persists MQTT subscription data
since the current implementation was resulting in timeout issues and
might eventually lead to thread starvation due to a blocking call.
Since the code that might lead to timeouts was removed the associated
configuration property was deprecated.
Previously, disabling subscription persistence involved disabling the
underlying queue. This commit also adds a new configuration parameter
to disable subscription persistence is a more elegant way and updates
the documentation accordingly.
There are also a couple of optimizations to avoid unnecessary IO.
---
.../api/config/ActiveMQDefaultConfiguration.java | 10 +++
.../core/protocol/mqtt/MQTTConnectionManager.java | 2 +-
.../core/protocol/mqtt/MQTTProtocolHandler.java | 15 +++-
.../artemis/core/protocol/mqtt/MQTTSession.java | 12 ++-
.../core/protocol/mqtt/MQTTSessionState.java | 4 +-
.../core/protocol/mqtt/MQTTStateManager.java | 100 ++++++++++-----------
.../protocol/mqtt/MQTTSubscriptionManager.java | 16 +++-
.../artemis/core/config/Configuration.java | 21 +++--
.../core/config/impl/ConfigurationImpl.java | 13 +++
.../deployers/impl/FileConfigurationParser.java | 6 +-
.../core/server/impl/ServerSessionImpl.java | 4 +-
.../artemis/utils/XMLConfigurationUtil.java | 12 +++
.../resources/schema/artemis-configuration.xsd | 10 ++-
.../core/config/impl/FileConfigurationTest.java | 1 +
.../resources/ConfigurationTest-full-config.xml | 1 +
.../ConfigurationTest-xinclude-config.xml | 1 +
.../ConfigurationTest-xinclude-schema-config.xml | 1 +
docs/user-manual/mqtt.adoc | 28 +++---
.../artemis/tests/integration/mqtt5/MQTT5Test.java | 43 +++++++++
19 files changed, 215 insertions(+), 85 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 719788994d..fe5353ff6c 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -689,6 +689,9 @@ public final class ActiveMQDefaultConfiguration {
// How long (in ms) to wait to persist MQTT session state
private static long DEFAULT_MQTT_SESSION_STATE_PERSISTENCE_TIMEOUT = 5000;
+ // Whether to persist MQTT subscriptions
+ private static boolean DEFAULT_MQTT_SUBSCRIPTION_PERSISTENCE_ENABLED = true;
+
// If SESSION-notifications should be suppressed or not
public static boolean DEFAULT_SUPPRESS_SESSION_NOTIFICATIONS = false;
@@ -1953,6 +1956,13 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_MQTT_SESSION_STATE_PERSISTENCE_TIMEOUT;
}
+ /**
+ * Whether to persist MQTT subscriptions
+ */
+ public static boolean getMqttSubscriptionPersistenceEnabled() {
+ return DEFAULT_MQTT_SUBSCRIPTION_PERSISTENCE_ENABLED;
+ }
+
public static boolean getDefaultSuppressSessionNotifications() {
return DEFAULT_SUPPRESS_SESSION_NOTIFICATIONS;
}
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
index 3890ab92ff..a4a12cb22a 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
@@ -168,7 +168,7 @@ public class MQTTConnectionManager {
null,
session.getSessionCallback(),
MQTTUtil.SESSION_AUTO_CREATE_QUEUE,
-
server.newOperationContext(),
+
session.getSessionContext(),
session.getProtocolManager().getPrefixes(),
session.getProtocolManager().getSecurityDomain(),
validatedUser,
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
index c6da66b0bf..447295c229 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
@@ -44,6 +44,7 @@ import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.core.io.IOCallback;
import
org.apache.activemq.artemis.core.protocol.mqtt.exceptions.DisconnectException;
import
org.apache.activemq.artemis.core.protocol.mqtt.exceptions.InvalidClientIdException;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -93,7 +94,8 @@ public class MQTTProtocolHandler extends
ChannelInboundHandlerAdapter {
void setConnection(MQTTConnection connection, ConnectionEntry entry) throws
Exception {
this.connectionEntry = entry;
this.connection = connection;
- this.session = new MQTTSession(this, connection, protocolManager,
server.getConfiguration().getWildcardConfiguration());
+ this.session = new MQTTSession(this, connection, protocolManager,
server.getConfiguration().getWildcardConfiguration(),
server.newOperationContext());
+ server.getStorageManager().setContext(session.getSessionContext());
}
void stop() {
@@ -406,7 +408,16 @@ public class MQTTProtocolHandler extends
ChannelInboundHandlerAdapter {
return;
}
MQTTUtil.logMessage(session.getState(), message, false,
session.getVersion());
- ctx.writeAndFlush(message, ctx.voidPromise());
+ server.getStorageManager().afterCompleteOperations(new IOCallback() {
+ @Override
+ public void done() {
+ ctx.writeAndFlush(message, ctx.voidPromise());
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ }
+ });
}
private int getMessageId(MqttMessage message) {
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
index eadd91097b..776d370704 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -27,6 +27,7 @@ import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@@ -76,16 +77,19 @@ public class MQTTSession {
private boolean usingServerKeepAlive = false;
+ private OperationContext sessionContext;
+
public MQTTSession(MQTTProtocolHandler protocolHandler,
MQTTConnection connection,
MQTTProtocolManager protocolManager,
- WildcardConfiguration wildcardConfiguration) throws
Exception {
+ WildcardConfiguration wildcardConfiguration,
+ OperationContext sessionContext) throws Exception {
this.protocolHandler = protocolHandler;
this.protocolManager = protocolManager;
this.stateManager = protocolManager.getStateManager();
this.wildcardConfiguration = wildcardConfiguration;
-
this.connection = connection;
+ this.sessionContext = sessionContext;
mqttConnectionManager = new MQTTConnectionManager(this);
mqttPublishManager = new MQTTPublishManager(this,
protocolManager.isCloseMqttConnectionOnPublishAuthorizationFailure());
@@ -251,6 +255,10 @@ public class MQTTSession {
this.wildcardConfiguration = wildcardConfiguration;
}
+ public OperationContext getSessionContext() {
+ return sessionContext;
+ }
+
public CoreMessageObjectPools getCoreMessageObjectPools() {
return coreMessageObjectPools;
}
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
index c29163fc03..12918bf7b4 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
@@ -101,7 +101,7 @@ public class MQTTSessionState {
}
/**
- * This constructor deserializes session data from a message. The format is
as follows.
+ * This constructor deserializes subscription data from a message. The
format is as follows.
* <ul>
* <li>byte: version
* <li>int: subscription count
@@ -119,7 +119,7 @@ public class MQTTSessionState {
* @param message the message holding the MQTT session data
*/
public MQTTSessionState(CoreMessage message) {
- logger.debug("Deserializing MQTT session state from {}", message);
+ logger.debug("Deserializing MQTT subscriptions from {}", message);
this.clientId = message.getStringProperty(Message.HDR_LAST_VALUE_NAME);
ActiveMQBuffer buf = message.getDataBuffer();
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
index fc2513c918..7d536a884b 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
@@ -25,8 +25,6 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@@ -39,7 +37,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.transaction.Transaction;
-import
org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.slf4j.Logger;
@@ -53,7 +50,7 @@ public class MQTTStateManager {
private final Queue sessionStore;
private static final Map<Integer, MQTTStateManager> INSTANCES = new
HashMap<>();
private final Map<String, MQTTConnection> connectedClients = new
ConcurrentHashMap<>();
- private final long timeout;
+ private final boolean subscriptionPersistenceEnabled;
/*
* Even though there may be multiple instances of MQTTProtocolManager (e.g.
for MQTT on different ports) we only want
@@ -76,33 +73,37 @@ public class MQTTStateManager {
private MQTTStateManager(ActiveMQServer server) throws Exception {
this.server = server;
- this.timeout =
server.getConfiguration().getMqttSessionStatePersistenceTimeout();
- this.sessionStore =
server.createQueue(QueueConfiguration.of(MQTTUtil.MQTT_SESSION_STORE).setRoutingType(RoutingType.ANYCAST).setLastValue(true).setDurable(true).setInternal(true).setAutoCreateAddress(true),
true);
-
- // load session data from queue
- try (LinkedListIterator<MessageReference> iterator =
sessionStore.browserIterator()) {
- while (iterator.hasNext()) {
- Message message = iterator.next().getMessage();
- if (!(message instanceof CoreMessage)) {
-
MQTTLogger.LOGGER.sessionStateMessageIncorrectType(message.getClass().getName());
- continue;
- }
- String clientId =
message.getStringProperty(Message.HDR_LAST_VALUE_NAME);
- if (clientId == null || clientId.isEmpty()) {
- MQTTLogger.LOGGER.sessionStateMessageBadClientId();
- continue;
- }
- MQTTSessionState sessionState;
- try {
- sessionState = new MQTTSessionState((CoreMessage) message);
- } catch (Exception e) {
- MQTTLogger.LOGGER.errorDeserializingStateMessage(e);
- continue;
+ this.subscriptionPersistenceEnabled =
server.getConfiguration().isMqttSubscriptionPersistenceEnabled();
+ if (subscriptionPersistenceEnabled) {
+ this.sessionStore =
server.createQueue(QueueConfiguration.of(MQTTUtil.MQTT_SESSION_STORE).setRoutingType(RoutingType.ANYCAST).setLastValue(true).setDurable(true).setInternal(true).setAutoCreateAddress(true),
true);
+
+ // load subscription data from queue
+ try (LinkedListIterator<MessageReference> iterator =
sessionStore.browserIterator()) {
+ while (iterator.hasNext()) {
+ Message message = iterator.next().getMessage();
+ if (!(message instanceof CoreMessage)) {
+
MQTTLogger.LOGGER.sessionStateMessageIncorrectType(message.getClass().getName());
+ continue;
+ }
+ String clientId =
message.getStringProperty(Message.HDR_LAST_VALUE_NAME);
+ if (clientId == null || clientId.isEmpty()) {
+ MQTTLogger.LOGGER.sessionStateMessageBadClientId();
+ continue;
+ }
+ MQTTSessionState sessionState;
+ try {
+ sessionState = new MQTTSessionState((CoreMessage) message);
+ } catch (Exception e) {
+ MQTTLogger.LOGGER.errorDeserializingStateMessage(e);
+ continue;
+ }
+ sessionStates.put(clientId, sessionState);
}
- sessionStates.put(clientId, sessionState);
+ } catch (NoSuchElementException ignored) {
+ // this could happen through paging browsing
}
- } catch (NoSuchElementException ignored) {
- // this could happen through paging browsing
+ } else {
+ this.sessionStore = null;
}
}
@@ -152,13 +153,18 @@ public class MQTTStateManager {
if (clientId == null) {
return null;
}
- removeDurableSessionState(clientId);
- return sessionStates.remove(clientId);
+ MQTTSessionState removed = sessionStates.remove(clientId);
+ if (removed != null && removed.getSubscriptions().size() > 0) {
+ removeDurableSubscriptionState(clientId);
+ }
+ return removed;
}
- public void removeDurableSessionState(String clientId) throws Exception {
- int deletedCount =
sessionStore.deleteMatchingReferences(FilterImpl.createFilter(new
StringBuilder(Message.HDR_LAST_VALUE_NAME).append(" =
'").append(clientId).append("'").toString()));
- logger.debug("Removed {} durable MQTT state records for: {}",
deletedCount, clientId);
+ public void removeDurableSubscriptionState(String clientId) throws
Exception {
+ if (subscriptionPersistenceEnabled) {
+ int deletedCount =
sessionStore.deleteMatchingReferences(FilterImpl.createFilter(new
StringBuilder(Message.HDR_LAST_VALUE_NAME).append(" =
'").append(clientId).append("'").toString()));
+ logger.debug("Removed {} durable MQTT subscription record(s) for:
{}", deletedCount, clientId);
+ }
}
public Map<String, MQTTSessionState> getSessionStates() {
@@ -170,25 +176,13 @@ public class MQTTStateManager {
return "MQTTSessionStateManager@" +
Integer.toHexString(System.identityHashCode(this));
}
- public void storeSessionState(MQTTSessionState state) throws Exception {
- logger.debug("Adding durable MQTT state record for: {}",
state.getClientId());
-
- /*
- * It is imperative to ensure the routed message is actually *all the
way* on the queue before proceeding
- * otherwise there can be a race with removing it.
- */
- CountDownLatch latch = new CountDownLatch(1);
- Transaction tx = new TransactionImpl(server.getStorageManager());
- server.getPostOffice().route(serializeState(state,
server.getStorageManager().generateID()), tx, false);
- tx.addOperation(new TransactionOperationAbstract() {
- @Override
- public void afterCommit(Transaction tx) {
- latch.countDown();
- }
- });
- tx.commit();
- if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
- throw MQTTBundle.BUNDLE.unableToStoreMqttState(timeout);
+ public void storeDurableSubscriptionState(MQTTSessionState state) throws
Exception {
+ if (subscriptionPersistenceEnabled) {
+ logger.debug("Adding durable MQTT subscription record for: {}",
state.getClientId());
+ Transaction tx = new TransactionImpl(server.getStorageManager());
+ tx.setAsync(true);
+ server.getPostOffice().route(serializeState(state,
server.getStorageManager().generateID()), tx, false);
+ tx.commit();
}
}
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index 68c2765fa8..db45491751 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -239,6 +239,10 @@ public class MQTTSubscriptionManager {
}
short[] removeSubscriptions(List<String> topics, boolean enforceSecurity)
throws Exception {
+ if (topics == null || topics.size() == 0) {
+ return new short[0];
+ }
+
short[] reasonCodes;
MQTTSessionState state = session.getState();
@@ -277,8 +281,14 @@ public class MQTTSubscriptionManager {
reasonCodes[i] = reasonCode;
}
- // store state after *all* requested subscriptions have been removed
in memory
- stateManager.storeSessionState(state);
+ // deal with durable state after *all* requested subscriptions have
been removed in memory
+ if (state.getSubscriptions().size() > 0) {
+ // if there are some subscriptions left then update the state
+ stateManager.storeDurableSubscriptionState(state);
+ } else {
+ // if there are no subscriptions left then remove the state
entirely
+ stateManager.removeDurableSubscriptionState(state.getClientId());
+ }
}
return reasonCodes;
@@ -327,7 +337,7 @@ public class MQTTSubscriptionManager {
}
// store state after *all* requested subscriptions have been created
in memory
- stateManager.storeSessionState(state);
+ stateManager.storeDurableSubscriptionState(state);
return qos;
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index 5f58cfcb3c..3526d0af76 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -1420,18 +1420,29 @@ public interface Configuration {
long getMqttSessionScanInterval();
/**
- * This is necessary because MQTT sessions and handled on a broker-wide
basis so this can't be set on a per-connector
- * basis like most of the other MQTT-specific settings.
+ * @deprecated This is no longer used by the broker. See
+ * {@link Configuration#setMqttSubscriptionPersistenceEnabled(boolean)}.
*/
+ @Deprecated(forRemoval = true)
Configuration setMqttSessionStatePersistenceTimeout(long
mqttSessionStatePersistenceTimeout);
/**
- * Get the MQTT session state persistence timeout
- *
- * @see Configuration#setMqttSessionStatePersistenceTimeout
+ * @deprecated This is no longer used by the broker. See {@link
Configuration#isMqttSubscriptionPersistenceEnabled}.
*/
+ @Deprecated(forRemoval = true)
long getMqttSessionStatePersistenceTimeout();
+ /**
+ * This is necessary because MQTT subsriptions are handled on a broker-wide
basis so this can't be set on a
+ * per-connector basis like most of the other MQTT-specific settings.
+ */
+ Configuration setMqttSubscriptionPersistenceEnabled(boolean
mqttSubscriptionPersistenceEnabled);
+
+ /**
+ * @see Configuration#setMqttSubscriptionPersistenceEnabled
+ */
+ boolean isMqttSubscriptionPersistenceEnabled();
+
/**
* {@return whether suppression of session-notifications is enabled for
this server; default is {@link
* ActiveMQDefaultConfiguration#DEFAULT_SUPPRESS_SESSION_NOTIFICATIONS}}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index 303bf59a74..4063c29bda 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -446,6 +446,8 @@ public class ConfigurationImpl implements Configuration,
Serializable {
private long mqttSessionStatePersistenceTimeout =
ActiveMQDefaultConfiguration.getMqttSessionStatePersistenceTimeout();
+ private boolean mqttSessionStatePersistenceEnabled =
ActiveMQDefaultConfiguration.getMqttSubscriptionPersistenceEnabled();
+
private boolean suppressSessionNotifications =
ActiveMQDefaultConfiguration.getDefaultSuppressSessionNotifications();
private String literalMatchMarkers =
ActiveMQDefaultConfiguration.getLiteralMatchMarkers();
@@ -3504,6 +3506,17 @@ public class ConfigurationImpl implements Configuration,
Serializable {
return this;
}
+ @Override
+ public boolean isMqttSubscriptionPersistenceEnabled() {
+ return mqttSessionStatePersistenceEnabled;
+ }
+
+ @Override
+ public Configuration setMqttSubscriptionPersistenceEnabled(boolean
mqttSubscriptionPersistenceEnabled) {
+ this.mqttSessionStatePersistenceEnabled =
mqttSubscriptionPersistenceEnabled;
+ return this;
+ }
+
@Override
public boolean isSuppressSessionNotifications() {
return suppressSessionNotifications;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 35372fc4d7..fb84d04067 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -400,6 +400,8 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
private static final String INITIAL_QUEUE_BUFFER_SIZE =
"initial-queue-buffer-size";
+ private static final String MQTT_SUBSCRIPTION_PERSISTENCE_ENABLED =
"mqtt-subscription-persistence-enabled";
+
private boolean validateAIO = false;
private boolean printPageMaxSizeUsed = false;
@@ -511,7 +513,9 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
config.setMqttSessionScanInterval(getLong(e,
"mqtt-session-scan-interval", config.getMqttSessionScanInterval(), GT_ZERO));
- config.setMqttSessionStatePersistenceTimeout(getLong(e,
"mqtt-session-state-persistence-timeout",
config.getMqttSessionStatePersistenceTimeout(), GT_ZERO));
+ config.setMqttSessionStatePersistenceTimeout(getLong(e,
"mqtt-session-state-persistence-timeout",
config.getMqttSessionStatePersistenceTimeout(), GT_ZERO,
MQTT_SUBSCRIPTION_PERSISTENCE_ENABLED));
+
+ config.setMqttSubscriptionPersistenceEnabled(getBoolean(e,
MQTT_SUBSCRIPTION_PERSISTENCE_ENABLED,
config.isMqttSubscriptionPersistenceEnabled()));
config.setGlobalMaxSizePercentOfJvmMaxMemory(getInteger(e,
GLOBAL_MAX_SIZE_PERCENT_JVM_MAX_MEM,
config.getGlobalMaxSizePercentOfJvmMaxMemory(), GT_ZERO));
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index cbb3ff6f6a..4d42063db5 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -763,7 +763,9 @@ public class ServerSessionImpl implements ServerSession,
FailureListener {
AddressSettings as =
server.getAddressSettingsRepository().getMatch(queueConfiguration.getAddress().toString());
- if (as.isAutoCreateAddresses() &&
(server.getAddressInfo(queueConfiguration.getAddress()) == null ||
!server.getAddressInfo(queueConfiguration.getAddress()).getRoutingTypes().contains(queueConfiguration.getRoutingType())))
{
+ AddressInfo addressInfo =
server.getAddressInfo(queueConfiguration.getAddress());
+
+ if (as.isAutoCreateAddresses() && addressInfo == null ||
!addressInfo.getRoutingTypes().contains(queueConfiguration.getRoutingType())) {
securityCheck(queueConfiguration.getAddress(),
queueConfiguration.getName(), CheckType.CREATE_ADDRESS, this);
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/utils/XMLConfigurationUtil.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/utils/XMLConfigurationUtil.java
index 0c64df604b..3ea9395250 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/utils/XMLConfigurationUtil.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/utils/XMLConfigurationUtil.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.utils;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
@@ -69,8 +70,19 @@ public class XMLConfigurationUtil {
final String name,
final long def,
final Validator<Number> validator) {
+ return getLong(e, name, def, validator, null);
+ }
+
+ public static final Long getLong(final Element e,
+ final String name,
+ final long def,
+ final Validator<Number> validator,
+ final String alternativeForDeprecated) {
NodeList nl = e.getElementsByTagName(name);
if (nl.getLength() > 0) {
+ if (alternativeForDeprecated != null) {
+ ActiveMQServerLogger.LOGGER.deprecatedConfigurationOption(name,
alternativeForDeprecated);
+ }
return (Long) validator.validate(name, XMLUtil.parseLong(nl.item(0)));
} else {
return def;
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 1edb3b5219..be4b57e59a 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -466,7 +466,15 @@
<xsd:element name="mqtt-session-state-persistence-timeout"
type="xsd:long" default="5000" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
- how long (in ms) to wait to persist MQTT session state
+ DEPRECATED: how long (in ms) to wait to persist MQTT session
state
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
+ <xsd:element name="mqtt-subscription-persistence-enabled"
type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ whether to persist MQTT subscriptions
</xsd:documentation>
</xsd:annotation>
</xsd:element>
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 490d491fff..24c32aaf9d 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -246,6 +246,7 @@ public class FileConfigurationTest extends
AbstractConfigurationTestBase {
assertFalse(configInstance.isRejectEmptyValidatedUser());
assertEquals(123456, configInstance.getMqttSessionScanInterval());
assertEquals(567890,
configInstance.getMqttSessionStatePersistenceTimeout());
+ assertFalse(configInstance.isMqttSubscriptionPersistenceEnabled());
assertEquals(98765, configInstance.getConnectionTtlCheckInterval());
assertEquals(1234567,
configInstance.getConfigurationFileRefreshPeriod());
assertEquals("UUID", configInstance.getTemporaryQueueNamespace());
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index f76d065fdd..f8744d4154 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -58,6 +58,7 @@
<reject-empty-validated-user>false</reject-empty-validated-user>
<mqtt-session-scan-interval>123456</mqtt-session-scan-interval>
<mqtt-session-state-persistence-timeout>567890</mqtt-session-state-persistence-timeout>
+
<mqtt-subscription-persistence-enabled>false</mqtt-subscription-persistence-enabled>
<connection-ttl-check-interval>98765</connection-ttl-check-interval>
<configuration-file-refresh-period>1234567</configuration-file-refresh-period>
<temporary-queue-namespace>TEMP</temporary-queue-namespace>
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
index 2545c1d437..e6f3e827f0 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
@@ -59,6 +59,7 @@
<reject-empty-validated-user>false</reject-empty-validated-user>
<mqtt-session-scan-interval>123456</mqtt-session-scan-interval>
<mqtt-session-state-persistence-timeout>567890</mqtt-session-state-persistence-timeout>
+
<mqtt-subscription-persistence-enabled>false</mqtt-subscription-persistence-enabled>
<connection-ttl-check-interval>98765</connection-ttl-check-interval>
<configuration-file-refresh-period>1234567</configuration-file-refresh-period>
<temporary-queue-namespace>TEMP</temporary-queue-namespace>
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
index 05e3295841..eec3aae8e7 100644
---
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
+++
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
@@ -59,6 +59,7 @@
<reject-empty-validated-user>false</reject-empty-validated-user>
<mqtt-session-scan-interval>123456</mqtt-session-scan-interval>
<mqtt-session-state-persistence-timeout>567890</mqtt-session-state-persistence-timeout>
+
<mqtt-subscription-persistence-enabled>false</mqtt-subscription-persistence-enabled>
<connection-ttl-check-interval>98765</connection-ttl-check-interval>
<configuration-file-refresh-period>1234567</configuration-file-refresh-period>
<temporary-queue-namespace>TEMP</temporary-queue-namespace>
diff --git a/docs/user-manual/mqtt.adoc b/docs/user-manual/mqtt.adoc
index 59f94a50a3..5cadbaa7cb 100644
--- a/docs/user-manual/mqtt.adoc
+++ b/docs/user-manual/mqtt.adoc
@@ -92,29 +92,29 @@ Payload logging is limited to avoid filling the logs with
potentially hundreds o
== Persistent Subscriptions
-The subscription information for MQTT sessions is stored in an internal queue
named `$sys.mqtt.sessions` and persisted to storage (assuming persistence is
enabled).
-The information is durable so that MQTT subscribers can reconnect and resume
their subscriptions seamlessly after a broker restart, failure, etc.
+The subscription information for MQTT sessions is kept in an internal queue
named `$sys.mqtt.sessions` and persisted to storage (assuming persistence is
enabled).
+The information is durable so that MQTT subscribers can reconnect and resume
their subscriptions seamlessly after a broker restart, failure, etc. without
having to resend a `SUBSCRIBE` packet.
When brokers are configured for high availability this information will be
available on the backup so even in the case of a broker fail-over subscribers
will be able to resume their subscriptions.
-While persistent subscriptions can be convenient they impose a performance
penalty since data must be written to storage.
-If you don't need the convenience (e.g. you always use clean sessions) and/or
you don't want the performance penalty then you can disable it by disabling the
`$sys.mqtt.sessions` queue in `broker.xml`, e.g.:
+While persistent subscriptions can be convenient they impose a performance
penalty since data must be written to and removed from storage.
+If you don't need the convenience (e.g. you always use clean sessions) and/or
you don't want the performance penalty then you can disable it by setting
`mqtt-subscription-persistence-enabled` to `false` in `broker.xml`, e.g.:
[,xml]
----
-<addresses>
+<core>
...
- <address name="$sys.mqtt.sessions">
- <anycast>
- <queue name="$sys.mqtt.sessions" enabled="false"/>
- </anycast>
- </address>
+
<mqtt-subscription-persistence-enabled>false</mqtt-subscription-persistence-enabled>
...
-</addresses>
+</core>
----
-The setting `mqtt-session-state-persistence-timeout` controls how long the
broker will wait for the data to be written to storage before throwing an error.
-It is measured in milliseconds.
-The default is `5000`.
+The default is `true`.
+
+[NOTE]
+====
+Even if `mqtt-subscription-persistence-enabled` is `false` the broker will
still keep track of QoS 1 & 2 messages.
+The _only_ impact of disabling MQTT subscription persistence is that clients
will have to resend `SUBSCRIBE` packets as necessary in order to continue
receiving messages after reconnecting after the server is restarted.
+====
== Custom Client ID Handling
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
index d23a3959d8..187b76fc7c 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
@@ -24,6 +24,8 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -56,6 +58,7 @@ import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder;
+import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
@@ -871,4 +874,44 @@ public class MQTT5Test extends MQTT5TestSupport {
Wait.assertEquals(1L, () -> subscriptionQueue.getMessagesAcknowledged(),
2000, 50);
Wait.assertEquals(0L, () -> subscriptionQueue.getMessageCount(), 2000,
50);
}
+
+ @Test
+ @Timeout(DEFAULT_TIMEOUT_SEC)
+ public void testLoadOnSubscriptionPersistence() throws Exception {
+ String topic = RandomUtil.randomUUIDString();
+ int numberOfThreads = 250;
+ AtomicBoolean failed = new AtomicBoolean(false);
+ ExecutorService executorService =
Executors.newFixedThreadPool(numberOfThreads);
+ runAfter(executorService::shutdownNow);
+
+ CountDownLatch latch = new CountDownLatch(numberOfThreads);
+
+ for (int i = 0; i < numberOfThreads; i++) {
+ executorService.submit(() -> {
+ MqttClient subscriber = null;
+ try {
+ subscriber = createPahoClient(RandomUtil.randomUUIDString());
+ subscriber.connect();
+ subscriber.subscribe(topic, AT_LEAST_ONCE);
+ subscriber.unsubscribe(topic);
+ } catch (MqttException e) {
+ logger.error(e.getMessage(), e);
+ failed.set(true);
+ } finally {
+ if (subscriber != null) {
+ try {
+ subscriber.disconnect();
+ subscriber.close();
+ } catch (MqttException e) {
+ // ignore
+ }
+ }
+ latch.countDown();
+ }
+ });
+ }
+
+ assertTrue(latch.await(1, TimeUnit.MINUTES), "not all tasks finished");
+ assertFalse(failed.get());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact