This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 96fa98fc93 ARTEMIS-3789 respect session expiry interval on MQTT
disconnect message
new 98eb31a2d7 This closes #4037
96fa98fc93 is described below
commit 96fa98fc93c0769b132784df539ad3eb6ce81a78
Author: Justin Bertram <[email protected]>
AuthorDate: Wed Apr 20 12:33:51 2022 -0500
ARTEMIS-3789 respect session expiry interval on MQTT disconnect message
---
.../core/protocol/mqtt/MQTTProtocolHandler.java | 13 +++++++++++-
.../artemis/tests/integration/mqtt5/MQTT5Test.java | 23 +++++++++++++++++++++-
2 files changed, 34 insertions(+), 2 deletions(-)
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
index 26535d9b7d..67a3cce4c1 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
@@ -51,6 +51,7 @@ import org.jboss.logging.Logger;
import static
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.AUTHENTICATION_DATA;
import static
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.AUTHENTICATION_METHOD;
+import static
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SESSION_EXPIRY_INTERVAL;
/**
* This class is responsible for receiving and sending MQTT packets,
delegating behaviour to one of the
@@ -173,7 +174,7 @@ public class MQTTProtocolHandler extends
ChannelInboundHandlerAdapter {
handleUnsubscribe((MqttUnsubscribeMessage) message);
break;
case DISCONNECT:
- disconnect(false);
+ disconnect(false, message);
break;
case UNSUBACK:
case SUBACK:
@@ -280,6 +281,16 @@ public class MQTTProtocolHandler extends
ChannelInboundHandlerAdapter {
}
void disconnect(boolean error) {
+ disconnect(error, null);
+ }
+
+ void disconnect(boolean error, MqttMessage disconnect) {
+ if (disconnect != null && disconnect.variableHeader() instanceof
MqttReasonCodeAndPropertiesVariableHeader) {
+ Integer sessionExpiryInterval = MQTTUtil.getProperty(Integer.class,
((MqttReasonCodeAndPropertiesVariableHeader)disconnect.variableHeader()).properties(),
SESSION_EXPIRY_INTERVAL, null);
+ if (sessionExpiryInterval != null) {
+
session.getState().setClientSessionExpiryInterval(sessionExpiryInterval);
+ }
+ }
session.getConnectionManager().disconnect(error);
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
index 1a4d398158..f7f3085bf7 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
@@ -20,16 +20,17 @@ package org.apache.activemq.artemis.tests.integration.mqtt5;
import javax.jms.JMSConsumer;
import javax.jms.JMSContext;
import javax.jms.Message;
-
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.utils.Wait;
+import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder;
@@ -164,4 +165,24 @@ public class MQTT5Test extends MQTT5TestSupport {
client2.disconnectForcibly(0, 0, false);
assertTrue(latch.await(2, TimeUnit.SECONDS));
}
+
+ /*
+ * It's possible for a client to change their session expiry interval via
the DISCONNECT packet. Ensure we respect
+ * a new session expiry interval when disconnecting.
+ */
+ @Test(timeout = DEFAULT_TIMEOUT)
+ public void testExpiryDelayOnDisconnect() throws Exception {
+ final String CONSUMER_ID = RandomUtil.randomString();
+
+ MqttAsyncClient consumer = createAsyncPahoClient(CONSUMER_ID);
+ MqttConnectionOptions options = new MqttConnectionOptionsBuilder()
+ .sessionExpiryInterval(300L)
+ .build();
+ consumer.connect(options).waitForCompletion();
+ MqttProperties disconnectProperties = new MqttProperties();
+ disconnectProperties.setSessionExpiryInterval(0L);
+ consumer.disconnect(0, null, null, MQTTReasonCodes.SUCCESS,
disconnectProperties).waitForCompletion();
+
+ Wait.assertEquals(0, () -> getSessionStates().size(), 5000, 10);
+ }
}