This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 5784bdee03 ARTEMIS-5499 refactor MQTT subscription persistence
5784bdee03 is described below

commit 5784bdee032f33bfef10909b376bdb53e1b74c90
Author: Justin Bertram <[email protected]>
AuthorDate: Wed May 28 15:56:51 2025 -0500

    ARTEMIS-5499 refactor MQTT subscription persistence
    
    This commit refactors how the broker persists MQTT subscription data
    since the current implementation was resulting in timeout issues and
    might eventually lead to thread starvation due to a blocking call.
    
    Since the code that might lead to timeouts was removed the associated
    configuration property was deprecated.
    
    Previously, disabling subscription persistence involved disabling the
    underlying queue. This commit also adds a new configuration parameter
    to disable subscription persistence is a more elegant way and updates
    the documentation accordingly.
    
    There are also a couple of optimizations to avoid unnecessary IO.
---
 .../api/config/ActiveMQDefaultConfiguration.java   |  10 +++
 .../core/protocol/mqtt/MQTTConnectionManager.java  |   2 +-
 .../core/protocol/mqtt/MQTTProtocolHandler.java    |  15 +++-
 .../artemis/core/protocol/mqtt/MQTTSession.java    |  12 ++-
 .../core/protocol/mqtt/MQTTSessionState.java       |   4 +-
 .../core/protocol/mqtt/MQTTStateManager.java       | 100 ++++++++++-----------
 .../protocol/mqtt/MQTTSubscriptionManager.java     |  16 +++-
 .../artemis/core/config/Configuration.java         |  21 +++--
 .../core/config/impl/ConfigurationImpl.java        |  13 +++
 .../deployers/impl/FileConfigurationParser.java    |   6 +-
 .../core/server/impl/ServerSessionImpl.java        |   4 +-
 .../artemis/utils/XMLConfigurationUtil.java        |  12 +++
 .../resources/schema/artemis-configuration.xsd     |  10 ++-
 .../core/config/impl/FileConfigurationTest.java    |   1 +
 .../resources/ConfigurationTest-full-config.xml    |   1 +
 .../ConfigurationTest-xinclude-config.xml          |   1 +
 .../ConfigurationTest-xinclude-schema-config.xml   |   1 +
 docs/user-manual/mqtt.adoc                         |  28 +++---
 .../artemis/tests/integration/mqtt5/MQTT5Test.java |  43 +++++++++
 19 files changed, 215 insertions(+), 85 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 719788994d..fe5353ff6c 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -689,6 +689,9 @@ public final class ActiveMQDefaultConfiguration {
    // How long (in ms) to wait to persist MQTT session state
    private static long DEFAULT_MQTT_SESSION_STATE_PERSISTENCE_TIMEOUT = 5000;
 
+   // Whether to persist MQTT subscriptions
+   private static boolean DEFAULT_MQTT_SUBSCRIPTION_PERSISTENCE_ENABLED = true;
+
    // If SESSION-notifications should be suppressed or not
    public static boolean DEFAULT_SUPPRESS_SESSION_NOTIFICATIONS = false;
 
@@ -1953,6 +1956,13 @@ public final class ActiveMQDefaultConfiguration {
       return DEFAULT_MQTT_SESSION_STATE_PERSISTENCE_TIMEOUT;
    }
 
+   /**
+    * Whether to persist MQTT subscriptions
+    */
+   public static boolean getMqttSubscriptionPersistenceEnabled() {
+      return DEFAULT_MQTT_SUBSCRIPTION_PERSISTENCE_ENABLED;
+   }
+
    public static boolean getDefaultSuppressSessionNotifications() {
       return DEFAULT_SUPPRESS_SESSION_NOTIFICATIONS;
    }
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
index 3890ab92ff..a4a12cb22a 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
@@ -168,7 +168,7 @@ public class MQTTConnectionManager {
                                                          null,
                                                          
session.getSessionCallback(),
                                                          
MQTTUtil.SESSION_AUTO_CREATE_QUEUE,
-                                                         
server.newOperationContext(),
+                                                         
session.getSessionContext(),
                                                          
session.getProtocolManager().getPrefixes(),
                                                          
session.getProtocolManager().getSecurityDomain(),
                                                          validatedUser,
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
index c6da66b0bf..447295c229 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
@@ -44,6 +44,7 @@ import io.netty.util.CharsetUtil;
 import io.netty.util.ReferenceCountUtil;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
 import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.core.io.IOCallback;
 import 
org.apache.activemq.artemis.core.protocol.mqtt.exceptions.DisconnectException;
 import 
org.apache.activemq.artemis.core.protocol.mqtt.exceptions.InvalidClientIdException;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -93,7 +94,8 @@ public class MQTTProtocolHandler extends 
ChannelInboundHandlerAdapter {
    void setConnection(MQTTConnection connection, ConnectionEntry entry) throws 
Exception {
       this.connectionEntry = entry;
       this.connection = connection;
-      this.session = new MQTTSession(this, connection, protocolManager, 
server.getConfiguration().getWildcardConfiguration());
+      this.session = new MQTTSession(this, connection, protocolManager, 
server.getConfiguration().getWildcardConfiguration(), 
server.newOperationContext());
+      server.getStorageManager().setContext(session.getSessionContext());
    }
 
    void stop() {
@@ -406,7 +408,16 @@ public class MQTTProtocolHandler extends 
ChannelInboundHandlerAdapter {
          return;
       }
       MQTTUtil.logMessage(session.getState(), message, false, 
session.getVersion());
-      ctx.writeAndFlush(message, ctx.voidPromise());
+      server.getStorageManager().afterCompleteOperations(new IOCallback() {
+         @Override
+         public void done() {
+            ctx.writeAndFlush(message, ctx.voidPromise());
+         }
+
+         @Override
+         public void onError(int errorCode, String errorMessage) {
+         }
+      });
    }
 
    private int getMessageId(MqttMessage message) {
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
index eadd91097b..776d370704 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -27,6 +27,7 @@ import io.netty.handler.codec.mqtt.MqttQoS;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
 import org.apache.activemq.artemis.core.config.WildcardConfiguration;
 import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@@ -76,16 +77,19 @@ public class MQTTSession {
 
    private boolean usingServerKeepAlive = false;
 
+   private OperationContext sessionContext;
+
    public MQTTSession(MQTTProtocolHandler protocolHandler,
                       MQTTConnection connection,
                       MQTTProtocolManager protocolManager,
-                      WildcardConfiguration wildcardConfiguration) throws 
Exception {
+                      WildcardConfiguration wildcardConfiguration,
+                      OperationContext sessionContext) throws Exception {
       this.protocolHandler = protocolHandler;
       this.protocolManager = protocolManager;
       this.stateManager = protocolManager.getStateManager();
       this.wildcardConfiguration = wildcardConfiguration;
-
       this.connection = connection;
+      this.sessionContext = sessionContext;
 
       mqttConnectionManager = new MQTTConnectionManager(this);
       mqttPublishManager = new MQTTPublishManager(this, 
protocolManager.isCloseMqttConnectionOnPublishAuthorizationFailure());
@@ -251,6 +255,10 @@ public class MQTTSession {
       this.wildcardConfiguration = wildcardConfiguration;
    }
 
+   public OperationContext getSessionContext() {
+      return sessionContext;
+   }
+
    public CoreMessageObjectPools getCoreMessageObjectPools() {
       return coreMessageObjectPools;
    }
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
index c29163fc03..12918bf7b4 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
@@ -101,7 +101,7 @@ public class MQTTSessionState {
    }
 
    /**
-    * This constructor deserializes session data from a message. The format is 
as follows.
+    * This constructor deserializes subscription data from a message. The 
format is as follows.
     * <ul>
     * <li>byte: version
     * <li>int: subscription count
@@ -119,7 +119,7 @@ public class MQTTSessionState {
     * @param message the message holding the MQTT session data
     */
    public MQTTSessionState(CoreMessage message) {
-      logger.debug("Deserializing MQTT session state from {}", message);
+      logger.debug("Deserializing MQTT subscriptions from {}", message);
       this.clientId = message.getStringProperty(Message.HDR_LAST_VALUE_NAME);
       ActiveMQBuffer buf = message.getDataBuffer();
 
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
index fc2513c918..7d536a884b 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
@@ -25,8 +25,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 
 import io.netty.handler.codec.mqtt.MqttTopicSubscription;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@@ -39,7 +37,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.transaction.Transaction;
-import 
org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
 import org.slf4j.Logger;
@@ -53,7 +50,7 @@ public class MQTTStateManager {
    private final Queue sessionStore;
    private static final Map<Integer, MQTTStateManager> INSTANCES = new 
HashMap<>();
    private final Map<String, MQTTConnection> connectedClients  = new 
ConcurrentHashMap<>();
-   private final long timeout;
+   private final boolean subscriptionPersistenceEnabled;
 
    /*
     * Even though there may be multiple instances of MQTTProtocolManager (e.g. 
for MQTT on different ports) we only want
@@ -76,33 +73,37 @@ public class MQTTStateManager {
 
    private MQTTStateManager(ActiveMQServer server) throws Exception {
       this.server = server;
-      this.timeout = 
server.getConfiguration().getMqttSessionStatePersistenceTimeout();
-      this.sessionStore = 
server.createQueue(QueueConfiguration.of(MQTTUtil.MQTT_SESSION_STORE).setRoutingType(RoutingType.ANYCAST).setLastValue(true).setDurable(true).setInternal(true).setAutoCreateAddress(true),
 true);
-
-      // load session data from queue
-      try (LinkedListIterator<MessageReference> iterator = 
sessionStore.browserIterator()) {
-         while (iterator.hasNext()) {
-            Message message = iterator.next().getMessage();
-            if (!(message instanceof CoreMessage)) {
-               
MQTTLogger.LOGGER.sessionStateMessageIncorrectType(message.getClass().getName());
-               continue;
-            }
-            String clientId = 
message.getStringProperty(Message.HDR_LAST_VALUE_NAME);
-            if (clientId == null || clientId.isEmpty()) {
-               MQTTLogger.LOGGER.sessionStateMessageBadClientId();
-               continue;
-            }
-            MQTTSessionState sessionState;
-            try {
-               sessionState = new MQTTSessionState((CoreMessage) message);
-            } catch (Exception e) {
-               MQTTLogger.LOGGER.errorDeserializingStateMessage(e);
-               continue;
+      this.subscriptionPersistenceEnabled = 
server.getConfiguration().isMqttSubscriptionPersistenceEnabled();
+      if (subscriptionPersistenceEnabled) {
+         this.sessionStore = 
server.createQueue(QueueConfiguration.of(MQTTUtil.MQTT_SESSION_STORE).setRoutingType(RoutingType.ANYCAST).setLastValue(true).setDurable(true).setInternal(true).setAutoCreateAddress(true),
 true);
+
+         // load subscription data from queue
+         try (LinkedListIterator<MessageReference> iterator = 
sessionStore.browserIterator()) {
+            while (iterator.hasNext()) {
+               Message message = iterator.next().getMessage();
+               if (!(message instanceof CoreMessage)) {
+                  
MQTTLogger.LOGGER.sessionStateMessageIncorrectType(message.getClass().getName());
+                  continue;
+               }
+               String clientId = 
message.getStringProperty(Message.HDR_LAST_VALUE_NAME);
+               if (clientId == null || clientId.isEmpty()) {
+                  MQTTLogger.LOGGER.sessionStateMessageBadClientId();
+                  continue;
+               }
+               MQTTSessionState sessionState;
+               try {
+                  sessionState = new MQTTSessionState((CoreMessage) message);
+               } catch (Exception e) {
+                  MQTTLogger.LOGGER.errorDeserializingStateMessage(e);
+                  continue;
+               }
+               sessionStates.put(clientId, sessionState);
             }
-            sessionStates.put(clientId, sessionState);
+         } catch (NoSuchElementException ignored) {
+            // this could happen through paging browsing
          }
-      } catch (NoSuchElementException ignored) {
-         // this could happen through paging browsing
+      } else {
+         this.sessionStore = null;
       }
    }
 
@@ -152,13 +153,18 @@ public class MQTTStateManager {
       if (clientId == null) {
          return null;
       }
-      removeDurableSessionState(clientId);
-      return sessionStates.remove(clientId);
+      MQTTSessionState removed = sessionStates.remove(clientId);
+      if (removed != null && removed.getSubscriptions().size() > 0) {
+         removeDurableSubscriptionState(clientId);
+      }
+      return removed;
    }
 
-   public void removeDurableSessionState(String clientId) throws Exception {
-      int deletedCount = 
sessionStore.deleteMatchingReferences(FilterImpl.createFilter(new 
StringBuilder(Message.HDR_LAST_VALUE_NAME).append(" = 
'").append(clientId).append("'").toString()));
-      logger.debug("Removed {} durable MQTT state records for: {}", 
deletedCount, clientId);
+   public void removeDurableSubscriptionState(String clientId) throws 
Exception {
+      if (subscriptionPersistenceEnabled) {
+         int deletedCount = 
sessionStore.deleteMatchingReferences(FilterImpl.createFilter(new 
StringBuilder(Message.HDR_LAST_VALUE_NAME).append(" = 
'").append(clientId).append("'").toString()));
+         logger.debug("Removed {} durable MQTT subscription record(s) for: 
{}", deletedCount, clientId);
+      }
    }
 
    public Map<String, MQTTSessionState> getSessionStates() {
@@ -170,25 +176,13 @@ public class MQTTStateManager {
       return "MQTTSessionStateManager@" + 
Integer.toHexString(System.identityHashCode(this));
    }
 
-   public void storeSessionState(MQTTSessionState state) throws Exception {
-      logger.debug("Adding durable MQTT state record for: {}", 
state.getClientId());
-
-      /*
-       * It is imperative to ensure the routed message is actually *all the 
way* on the queue before proceeding
-       * otherwise there can be a race with removing it.
-       */
-      CountDownLatch latch = new CountDownLatch(1);
-      Transaction tx = new TransactionImpl(server.getStorageManager());
-      server.getPostOffice().route(serializeState(state, 
server.getStorageManager().generateID()), tx, false);
-      tx.addOperation(new TransactionOperationAbstract() {
-         @Override
-         public void afterCommit(Transaction tx) {
-            latch.countDown();
-         }
-      });
-      tx.commit();
-      if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
-         throw MQTTBundle.BUNDLE.unableToStoreMqttState(timeout);
+   public void storeDurableSubscriptionState(MQTTSessionState state) throws 
Exception {
+      if (subscriptionPersistenceEnabled) {
+         logger.debug("Adding durable MQTT subscription record for: {}", 
state.getClientId());
+         Transaction tx = new TransactionImpl(server.getStorageManager());
+         tx.setAsync(true);
+         server.getPostOffice().route(serializeState(state, 
server.getStorageManager().generateID()), tx, false);
+         tx.commit();
       }
    }
 
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index 68c2765fa8..db45491751 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -239,6 +239,10 @@ public class MQTTSubscriptionManager {
    }
 
    short[] removeSubscriptions(List<String> topics, boolean enforceSecurity) 
throws Exception {
+      if (topics == null || topics.size() == 0) {
+         return new short[0];
+      }
+
       short[] reasonCodes;
       MQTTSessionState state = session.getState();
 
@@ -277,8 +281,14 @@ public class MQTTSubscriptionManager {
             reasonCodes[i] =  reasonCode;
          }
 
-         // store state after *all* requested subscriptions have been removed 
in memory
-         stateManager.storeSessionState(state);
+         // deal with durable state after *all* requested subscriptions have 
been removed in memory
+         if (state.getSubscriptions().size() > 0) {
+            // if there are some subscriptions left then update the state
+            stateManager.storeDurableSubscriptionState(state);
+         } else {
+            // if there are no subscriptions left then remove the state 
entirely
+            stateManager.removeDurableSubscriptionState(state.getClientId());
+         }
       }
 
       return reasonCodes;
@@ -327,7 +337,7 @@ public class MQTTSubscriptionManager {
          }
 
          // store state after *all* requested subscriptions have been created 
in memory
-         stateManager.storeSessionState(state);
+         stateManager.storeDurableSubscriptionState(state);
 
          return qos;
       }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index 5f58cfcb3c..3526d0af76 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -1420,18 +1420,29 @@ public interface Configuration {
    long getMqttSessionScanInterval();
 
    /**
-    * This is necessary because MQTT sessions and handled on a broker-wide 
basis so this can't be set on a per-connector
-    * basis like most of the other MQTT-specific settings.
+    * @deprecated This is no longer used by the broker. See
+    * {@link Configuration#setMqttSubscriptionPersistenceEnabled(boolean)}.
     */
+   @Deprecated(forRemoval = true)
    Configuration setMqttSessionStatePersistenceTimeout(long 
mqttSessionStatePersistenceTimeout);
 
    /**
-    * Get the MQTT session state persistence timeout
-    *
-    * @see Configuration#setMqttSessionStatePersistenceTimeout
+    * @deprecated This is no longer used by the broker. See {@link 
Configuration#isMqttSubscriptionPersistenceEnabled}.
     */
+   @Deprecated(forRemoval = true)
    long getMqttSessionStatePersistenceTimeout();
 
+   /**
+    * This is necessary because MQTT subsriptions are handled on a broker-wide 
basis so this can't be set on a
+    * per-connector basis like most of the other MQTT-specific settings.
+    */
+   Configuration setMqttSubscriptionPersistenceEnabled(boolean 
mqttSubscriptionPersistenceEnabled);
+
+   /**
+    * @see Configuration#setMqttSubscriptionPersistenceEnabled
+    */
+   boolean isMqttSubscriptionPersistenceEnabled();
+
    /**
     * {@return whether suppression of session-notifications is enabled for 
this server; default is {@link
     * ActiveMQDefaultConfiguration#DEFAULT_SUPPRESS_SESSION_NOTIFICATIONS}}
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index 303bf59a74..4063c29bda 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -446,6 +446,8 @@ public class ConfigurationImpl implements Configuration, 
Serializable {
 
    private long mqttSessionStatePersistenceTimeout = 
ActiveMQDefaultConfiguration.getMqttSessionStatePersistenceTimeout();
 
+   private boolean mqttSessionStatePersistenceEnabled = 
ActiveMQDefaultConfiguration.getMqttSubscriptionPersistenceEnabled();
+
    private boolean suppressSessionNotifications = 
ActiveMQDefaultConfiguration.getDefaultSuppressSessionNotifications();
 
    private String literalMatchMarkers = 
ActiveMQDefaultConfiguration.getLiteralMatchMarkers();
@@ -3504,6 +3506,17 @@ public class ConfigurationImpl implements Configuration, 
Serializable {
       return this;
    }
 
+   @Override
+   public boolean isMqttSubscriptionPersistenceEnabled() {
+      return mqttSessionStatePersistenceEnabled;
+   }
+
+   @Override
+   public Configuration setMqttSubscriptionPersistenceEnabled(boolean 
mqttSubscriptionPersistenceEnabled) {
+      this.mqttSessionStatePersistenceEnabled = 
mqttSubscriptionPersistenceEnabled;
+      return this;
+   }
+
    @Override
    public boolean isSuppressSessionNotifications() {
       return suppressSessionNotifications;
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 35372fc4d7..fb84d04067 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -400,6 +400,8 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
 
    private static final String INITIAL_QUEUE_BUFFER_SIZE = 
"initial-queue-buffer-size";
 
+   private static final String MQTT_SUBSCRIPTION_PERSISTENCE_ENABLED = 
"mqtt-subscription-persistence-enabled";
+
    private boolean validateAIO = false;
 
    private boolean printPageMaxSizeUsed = false;
@@ -511,7 +513,9 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
 
       config.setMqttSessionScanInterval(getLong(e, 
"mqtt-session-scan-interval", config.getMqttSessionScanInterval(), GT_ZERO));
 
-      config.setMqttSessionStatePersistenceTimeout(getLong(e, 
"mqtt-session-state-persistence-timeout", 
config.getMqttSessionStatePersistenceTimeout(), GT_ZERO));
+      config.setMqttSessionStatePersistenceTimeout(getLong(e, 
"mqtt-session-state-persistence-timeout", 
config.getMqttSessionStatePersistenceTimeout(), GT_ZERO, 
MQTT_SUBSCRIPTION_PERSISTENCE_ENABLED));
+
+      config.setMqttSubscriptionPersistenceEnabled(getBoolean(e, 
MQTT_SUBSCRIPTION_PERSISTENCE_ENABLED, 
config.isMqttSubscriptionPersistenceEnabled()));
 
       config.setGlobalMaxSizePercentOfJvmMaxMemory(getInteger(e, 
GLOBAL_MAX_SIZE_PERCENT_JVM_MAX_MEM, 
config.getGlobalMaxSizePercentOfJvmMaxMemory(), GT_ZERO));
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index cbb3ff6f6a..4d42063db5 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -763,7 +763,9 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
 
       AddressSettings as = 
server.getAddressSettingsRepository().getMatch(queueConfiguration.getAddress().toString());
 
-      if (as.isAutoCreateAddresses() && 
(server.getAddressInfo(queueConfiguration.getAddress()) == null || 
!server.getAddressInfo(queueConfiguration.getAddress()).getRoutingTypes().contains(queueConfiguration.getRoutingType())))
 {
+      AddressInfo addressInfo = 
server.getAddressInfo(queueConfiguration.getAddress());
+
+      if (as.isAutoCreateAddresses() && addressInfo == null || 
!addressInfo.getRoutingTypes().contains(queueConfiguration.getRoutingType())) {
          securityCheck(queueConfiguration.getAddress(), 
queueConfiguration.getName(), CheckType.CREATE_ADDRESS, this);
       }
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/utils/XMLConfigurationUtil.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/utils/XMLConfigurationUtil.java
index 0c64df604b..3ea9395250 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/utils/XMLConfigurationUtil.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/utils/XMLConfigurationUtil.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.utils;
 
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
@@ -69,8 +70,19 @@ public class XMLConfigurationUtil {
                                     final String name,
                                     final long def,
                                     final Validator<Number> validator) {
+      return getLong(e, name, def, validator, null);
+   }
+
+   public static final Long getLong(final Element e,
+                                    final String name,
+                                    final long def,
+                                    final Validator<Number> validator,
+                                    final String alternativeForDeprecated) {
       NodeList nl = e.getElementsByTagName(name);
       if (nl.getLength() > 0) {
+         if (alternativeForDeprecated != null) {
+            ActiveMQServerLogger.LOGGER.deprecatedConfigurationOption(name, 
alternativeForDeprecated);
+         }
          return (Long) validator.validate(name, XMLUtil.parseLong(nl.item(0)));
       } else {
          return def;
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd 
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 1edb3b5219..be4b57e59a 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -466,7 +466,15 @@
          <xsd:element name="mqtt-session-state-persistence-timeout" 
type="xsd:long" default="5000" maxOccurs="1" minOccurs="0">
             <xsd:annotation>
                <xsd:documentation>
-                  how long (in ms) to wait to persist MQTT session state
+                  DEPRECATED: how long (in ms) to wait to persist MQTT session 
state
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+
+         <xsd:element name="mqtt-subscription-persistence-enabled" 
type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
+            <xsd:annotation>
+               <xsd:documentation>
+                  whether to persist MQTT subscriptions
                </xsd:documentation>
             </xsd:annotation>
          </xsd:element>
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 490d491fff..24c32aaf9d 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -246,6 +246,7 @@ public class FileConfigurationTest extends 
AbstractConfigurationTestBase {
       assertFalse(configInstance.isRejectEmptyValidatedUser());
       assertEquals(123456, configInstance.getMqttSessionScanInterval());
       assertEquals(567890, 
configInstance.getMqttSessionStatePersistenceTimeout());
+      assertFalse(configInstance.isMqttSubscriptionPersistenceEnabled());
       assertEquals(98765, configInstance.getConnectionTtlCheckInterval());
       assertEquals(1234567, 
configInstance.getConfigurationFileRefreshPeriod());
       assertEquals("UUID", configInstance.getTemporaryQueueNamespace());
diff --git 
a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml 
b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index f76d065fdd..f8744d4154 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -58,6 +58,7 @@
       <reject-empty-validated-user>false</reject-empty-validated-user>
       <mqtt-session-scan-interval>123456</mqtt-session-scan-interval>
       
<mqtt-session-state-persistence-timeout>567890</mqtt-session-state-persistence-timeout>
+      
<mqtt-subscription-persistence-enabled>false</mqtt-subscription-persistence-enabled>
       <connection-ttl-check-interval>98765</connection-ttl-check-interval>
       
<configuration-file-refresh-period>1234567</configuration-file-refresh-period>
       <temporary-queue-namespace>TEMP</temporary-queue-namespace>
diff --git 
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml 
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
index 2545c1d437..e6f3e827f0 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
@@ -59,6 +59,7 @@
       <reject-empty-validated-user>false</reject-empty-validated-user>
       <mqtt-session-scan-interval>123456</mqtt-session-scan-interval>
       
<mqtt-session-state-persistence-timeout>567890</mqtt-session-state-persistence-timeout>
+      
<mqtt-subscription-persistence-enabled>false</mqtt-subscription-persistence-enabled>
       <connection-ttl-check-interval>98765</connection-ttl-check-interval>
       
<configuration-file-refresh-period>1234567</configuration-file-refresh-period>
       <temporary-queue-namespace>TEMP</temporary-queue-namespace>
diff --git 
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
 
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
index 05e3295841..eec3aae8e7 100644
--- 
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
+++ 
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
@@ -59,6 +59,7 @@
       <reject-empty-validated-user>false</reject-empty-validated-user>
       <mqtt-session-scan-interval>123456</mqtt-session-scan-interval>
       
<mqtt-session-state-persistence-timeout>567890</mqtt-session-state-persistence-timeout>
+      
<mqtt-subscription-persistence-enabled>false</mqtt-subscription-persistence-enabled>
       <connection-ttl-check-interval>98765</connection-ttl-check-interval>
       
<configuration-file-refresh-period>1234567</configuration-file-refresh-period>
       <temporary-queue-namespace>TEMP</temporary-queue-namespace>
diff --git a/docs/user-manual/mqtt.adoc b/docs/user-manual/mqtt.adoc
index 59f94a50a3..5cadbaa7cb 100644
--- a/docs/user-manual/mqtt.adoc
+++ b/docs/user-manual/mqtt.adoc
@@ -92,29 +92,29 @@ Payload logging is limited to avoid filling the logs with 
potentially hundreds o
 
 == Persistent Subscriptions
 
-The subscription information for MQTT sessions is stored in an internal queue 
named `$sys.mqtt.sessions` and persisted to storage (assuming persistence is 
enabled).
-The information is durable so that MQTT subscribers can reconnect and resume 
their subscriptions seamlessly after a broker restart, failure, etc.
+The subscription information for MQTT sessions is kept in an internal queue 
named `$sys.mqtt.sessions` and persisted to storage (assuming persistence is 
enabled).
+The information is durable so that MQTT subscribers can reconnect and resume 
their subscriptions seamlessly after a broker restart, failure, etc. without 
having to resend a `SUBSCRIBE` packet.
 When brokers are configured for high availability this information will be 
available on the backup so even in the case of a broker fail-over subscribers 
will be able to resume their subscriptions.
 
-While persistent subscriptions can be convenient they impose a performance 
penalty since data must be written to storage.
-If you don't need the convenience (e.g. you always use clean sessions) and/or 
you don't want the performance penalty then you can disable it by disabling the 
`$sys.mqtt.sessions` queue in `broker.xml`, e.g.:
+While persistent subscriptions can be convenient they impose a performance 
penalty since data must be written to and removed from storage.
+If you don't need the convenience (e.g. you always use clean sessions) and/or 
you don't want the performance penalty then you can disable it by setting 
`mqtt-subscription-persistence-enabled` to `false` in `broker.xml`, e.g.:
 
 [,xml]
 ----
-<addresses>
+<core>
    ...
-   <address name="$sys.mqtt.sessions">
-      <anycast>
-         <queue name="$sys.mqtt.sessions" enabled="false"/>
-      </anycast>
-   </address>
+   
<mqtt-subscription-persistence-enabled>false</mqtt-subscription-persistence-enabled>
    ...
-</addresses>
+</core>
 ----
 
-The setting `mqtt-session-state-persistence-timeout` controls how long the 
broker will wait for the data to be written to storage before throwing an error.
-It is measured in milliseconds.
-The default is `5000`.
+The default is `true`.
+
+[NOTE]
+====
+Even if `mqtt-subscription-persistence-enabled` is `false` the broker will 
still keep track of QoS 1 & 2 messages.
+The _only_ impact of disabling MQTT subscription persistence is that clients 
will have to resend `SUBSCRIBE` packets as necessary in order to continue 
receiving messages after reconnecting after the server is restarted.
+====
 
 == Custom Client ID Handling
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
index d23a3959d8..187b76fc7c 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
@@ -24,6 +24,8 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -56,6 +58,7 @@ import org.eclipse.paho.mqttv5.client.MqttCallback;
 import org.eclipse.paho.mqttv5.client.MqttClient;
 import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
 import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder;
+import org.eclipse.paho.mqttv5.common.MqttException;
 import org.eclipse.paho.mqttv5.common.MqttMessage;
 import org.eclipse.paho.mqttv5.common.MqttSubscription;
 import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
@@ -871,4 +874,44 @@ public class MQTT5Test extends MQTT5TestSupport {
       Wait.assertEquals(1L, () -> subscriptionQueue.getMessagesAcknowledged(), 
2000, 50);
       Wait.assertEquals(0L, () -> subscriptionQueue.getMessageCount(), 2000, 
50);
    }
+
+   @Test
+   @Timeout(DEFAULT_TIMEOUT_SEC)
+   public void testLoadOnSubscriptionPersistence() throws Exception {
+      String topic = RandomUtil.randomUUIDString();
+      int numberOfThreads = 250;
+      AtomicBoolean failed = new AtomicBoolean(false);
+      ExecutorService executorService = 
Executors.newFixedThreadPool(numberOfThreads);
+      runAfter(executorService::shutdownNow);
+
+      CountDownLatch latch = new CountDownLatch(numberOfThreads);
+
+      for (int i = 0; i < numberOfThreads; i++) {
+         executorService.submit(() -> {
+            MqttClient subscriber = null;
+            try {
+               subscriber = createPahoClient(RandomUtil.randomUUIDString());
+               subscriber.connect();
+               subscriber.subscribe(topic, AT_LEAST_ONCE);
+               subscriber.unsubscribe(topic);
+            } catch (MqttException e) {
+               logger.error(e.getMessage(), e);
+               failed.set(true);
+            } finally {
+               if (subscriber != null) {
+                  try {
+                     subscriber.disconnect();
+                     subscriber.close();
+                  } catch (MqttException e) {
+                     // ignore
+                  }
+               }
+               latch.countDown();
+            }
+         });
+      }
+
+      assertTrue(latch.await(1, TimeUnit.MINUTES), "not all tasks finished");
+      assertFalse(failed.get());
+   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact



Reply via email to