This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 8e68bb1902 ARTEMIS-4501 clean up MQTT subscription queues when session
expires
8e68bb1902 is described below
commit 8e68bb1902f0044db36f0c4b99acc74495039585
Author: Justin Bertram <[email protected]>
AuthorDate: Mon Nov 13 12:02:46 2023 -0600
ARTEMIS-4501 clean up MQTT subscription queues when session expires
---
.../core/protocol/mqtt/MQTTStateManager.java | 7 ++++--
docs/user-manual/mqtt.adoc | 26 ++++++----------------
docs/user-manual/versions.adoc | 19 ++++++++++++++++
.../mqtt5/spec/controlpackets/ConnectTests.java | 13 ++++++++---
4 files changed, 41 insertions(+), 24 deletions(-)
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
index aa913b8f3a..89705f067c 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
@@ -110,8 +110,11 @@ public class MQTTStateManager {
for (String key : toRemove) {
try {
MQTTSessionState state = removeSessionState(key);
- if (state != null && state.isWill() && !state.isAttached() &&
state.isFailed()) {
- state.getSession().sendWillMessage();
+ if (state != null) {
+ if (state.isWill() && !state.isAttached() && state.isFailed()) {
+ state.getSession().sendWillMessage();
+ }
+ state.getSession().clean(false);
}
} catch (Exception e) {
MQTTLogger.LOGGER.failedToRemoveSessionState(key, e);
diff --git a/docs/user-manual/mqtt.adoc b/docs/user-manual/mqtt.adoc
index d460279bd8..2adea4926a 100644
--- a/docs/user-manual/mqtt.adoc
+++ b/docs/user-manual/mqtt.adoc
@@ -165,27 +165,15 @@ In the case of MQTT 5 clients they will receive a
disconnect reason code of http
== Automatic Subscription Clean-up
-Sometimes MQTT clients using `CleanSession=false` don't clean up their
subscriptions.
-In such situations the following address-setting can be used to clean up the
abandoned subscription queues:
+Sometimes MQTT 3.x clients using `CleanSession=false` don't properly
unsubscribe. The URL parameter `defaultMqttSessionExpiryInterval` can be
configured on the MQTT `acceptor` so that abandoned sessions and subscription
queues will be cleaned up automatically after the expiry interval elapses.
-[,xml]
-----
- <address-setting match="myMqttAddress">
- <auto-delete-created-queues>true</auto-delete-created-queues>
- <auto-delete-queues-delay>3600000</auto-delete-queues-delay> <!-- 1 hour
delay -->
- <auto-delete-queues-message-count>-1</auto-delete-queues-message-count>
<!-- doesn't matter how many messages there are -->
- </address-setting>
-----
-
-However, the MQTT session meta-data is still present in memory and needs to be
cleaned up as well.
-The URL parameter `defaultMqttSessionExpiryInterval` can be configured on the
MQTT `acceptor` to deal with this situation.
-
-MQTT 5 added a new
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901048[session
expiry interval] property with the same basic semantics.
-The broker will use the client's value for this property if it is set.
-If it is not set then it will apply the `defaultMqttSessionExpiryInterval`.
+MQTT 5 has the same basic semantics with slightly different configuration.
+The `CleanSession` flag was replaced with `CleanStart` and a
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901048[session
expiry interval] property.
+The broker will use the client's session expiry interval if it is set.
+If it is not set then the broker will apply the
`defaultMqttSessionExpiryInterval`.
-The default `defaultMqttSessionExpiryInterval` is `-1` which means no MQTT 3.x
session states will be expired and no MQTT 5 session states which do not pass
their own session expiry interval will be expired.
-Otherwise it represents the number of *seconds* which must elapse after the
client has disconnected before the broker will remove the session state.
+The default `defaultMqttSessionExpiryInterval` is `-1` which means no clean up
will happen for MQTT 3.x clients or for MQTT 5 clients which do not pass their
own session expiry interval.
+Otherwise it represents the number of *seconds* which must elapse after the
client has disconnected before the broker will remove the session state and
subscription queues.
MQTT session state is scanned every 5,000 milliseconds by default.
This can be changed using the `mqtt-session-scan-interval` element set in the
`core` section of `broker.xml`.
diff --git a/docs/user-manual/versions.adoc b/docs/user-manual/versions.adoc
index a99ccfb1ca..c2e71637a7 100644
--- a/docs/user-manual/versions.adoc
+++ b/docs/user-manual/versions.adoc
@@ -12,6 +12,25 @@ NOTE: If the upgrade spans multiple versions then the steps
from *each* version
NOTE: Follow the general upgrade procedure outlined in the
xref:upgrading.adoc#upgrading-the-broker[Upgrading the Broker] chapter in
addition to any version-specific upgrade instructions outlined here.
+== 2.32.0
+
+https://issues.apache.org/jira/secure/ReleaseNote.jspa...
+
+=== Highlights
+
+* highlight 1
+* highlight 2
+
+=== Upgrading from 2.31.x
+
+* Due to https://issues.apache.org/jira/browse/ARTEMIS-4501[ARTEMIS-4501] MQTT
subscription queues will be automatically removed when the corresponding
session expires, either based on the session expiry interval passed by an MQTT
5 client or based on the configured `defaultMqttSessionExpiryInterval` for MQTT
3.x clients or MQTT 5 clients which don't explicitly pass a session expiry
interval.
++
+Prior to this change removing subscription queues relied on the generic
`auto-delete-*` `address-settings`.
++
+These settings are now no longer required.
++
+Configure `defaultMqttSessionExpiryInterval` instead.
+
== 2.31.2
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315920&version=12353776[Full
release notes]
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java
index a3b6eb3c03..f423fce626 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java
@@ -968,13 +968,20 @@ public class ConnectTests extends MQTT5TestSupport {
.build();
consumer.connect(options);
consumer.subscribe(TOPIC, 2);
+ long start = System.currentTimeMillis();
consumer.disconnect();
- // session should *not* still exist since session expiry interval has
passed
- long start = System.currentTimeMillis();
+ // ensure the subscription queue still exists since the session hasn't
expired
+ assertNotNull(getSubscriptionQueue(TOPIC, CONSUMER_ID));
+
Wait.assertEquals(0, () -> getSessionStates().size(), EXPIRY_INTERVAL *
1000 * 2, 100);
- assertTrue(System.currentTimeMillis() - start > (EXPIRY_INTERVAL *
1000));
+ assertTrue(System.currentTimeMillis() - start >= (EXPIRY_INTERVAL *
1000));
+
+ // session should *not* still exist since session expiry interval has
passed
assertNull(getSessionStates().get(CONSUMER_ID));
+
+ // ensure the subscription queue is cleaned up when the session expires
+ Wait.assertTrue(() -> getSubscriptionQueue(TOPIC, CONSUMER_ID) == null,
2000, 100);
}
/*