[
https://issues.apache.org/jira/browse/ARTEMIS-966?focusedWorklogId=877918&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-877918
]
ASF GitHub Bot logged work on ARTEMIS-966:
------------------------------------------
Author: ASF GitHub Bot
Created on: 23/Aug/23 16:49
Start Date: 23/Aug/23 16:49
Worklog Time Spent: 10m
Work Description: gemmellr commented on code in PR #4583:
URL: https://github.com/apache/activemq-artemis/pull/4583#discussion_r1303294333
##########
artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/protocol/mqtt/SessionStateManagerTest.java:
##########
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.mqtt;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import io.netty.handler.codec.mqtt.MqttQoS;
+import io.netty.handler.codec.mqtt.MqttSubscriptionOption;
+import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSessionState;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSessionStateManager;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SessionStateManagerTest extends ActiveMQTestBase {
+
+ protected ActiveMQServer server;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ Configuration config = createDefaultInVMConfig();
+ server = createServer(true, config);
+ server.start();
+ }
+
+ @Test(timeout = 30000)
+ public void testSessionStateManager() throws Exception {
+ final long SESSION_COUNT = 500;
+
+ List<Pair<String, Collection<Pair<MqttTopicSubscription, Integer>>>>
sessions = new ArrayList<>();
+ for (int i = 0; i < SESSION_COUNT; i++) {
+ List<Pair<MqttTopicSubscription, Integer>> subs = new ArrayList<>();
+ Pair<String, Collection<Pair<MqttTopicSubscription, Integer>>>
session = new Pair<>(RandomUtil.randomString(), subs);
+ for (int j = 0; j < RandomUtil.randomInterval(1, 50); j++) {
+ MqttTopicSubscription sub = new
MqttTopicSubscription(RandomUtil.randomString(),
+ new
MqttSubscriptionOption(MqttQoS.valueOf(RandomUtil.randomInterval(0, 3)),
+
RandomUtil.randomBoolean(),
+
RandomUtil.randomBoolean(),
+
MqttSubscriptionOption.RetainedHandlingPolicy.valueOf(RandomUtil.randomInterval(0,
3))));
+ subs.add(new Pair(sub, RandomUtil.randomPositiveIntOrNull()));
+ }
+ sessions.add(session);
+ }
+
+ MQTTSessionStateManager stateManager =
MQTTSessionStateManager.getInstance(server);
+
+ for (Pair<String, Collection<Pair<MqttTopicSubscription, Integer>>>
session : sessions) {
+ String clientId = session.getA();
+ Collection<Pair<MqttTopicSubscription, Integer>> subs =
session.getB();
+ MQTTSessionState sessionState = new MQTTSessionState(clientId,
stateManager);
+ for (Pair<MqttTopicSubscription, Integer> sub : subs) {
+ sessionState.addSubscription(sub.getA(), MQTTUtil.MQTT_WILDCARD,
sub.getB());
+ }
+ }
+
+ Wait.assertEquals(SESSION_COUNT, () ->
server.locateQueue(MQTTUtil.MQTT_SESSION_STORE).getMessageCount(), 2000, 100);
+
+ server.stop();
+ server.start();
+
+ Wait.assertEquals(SESSION_COUNT, () ->
server.locateQueue(MQTTUtil.MQTT_SESSION_STORE).getMessageCount(), 2000, 100);
+
+ stateManager = MQTTSessionStateManager.getInstance(server);
+
+ for (Pair<String, Collection<Pair<MqttTopicSubscription, Integer>>>
session : sessions) {
+ String clientId = session.getA();
+ Collection<Pair<MqttTopicSubscription, Integer>> mySubs =
session.getB();
+ Map<String, MQTTSessionState> sessionStates =
stateManager.getSessionStates();
+ assertTrue(sessionStates.containsKey(clientId));
+ MQTTSessionState state = sessionStates.get(clientId);
+ Collection<Pair<MqttTopicSubscription, Integer>> brokerSubs =
state.getSubscriptionsPlusID();
+ assertEquals(mySubs.size(), brokerSubs.size());
+ for (Pair<MqttTopicSubscription, Integer> mySub : mySubs) {
+ boolean found = false;
+ for (Pair<MqttTopicSubscription, Integer> brokerSub : brokerSubs) {
+ if (compareSubs(mySub.getA(), brokerSub.getA())) {
+ found = true;
+ assertEquals(mySub.getB(), brokerSub.getB());
+ break;
+ }
+ }
+ assertTrue(found);
+ }
+ }
+ }
+
+ private boolean compareSubs(MqttTopicSubscription a, MqttTopicSubscription
b) {
+ if (a == b)
+ return true;
+ if (a == null || b == null)
+ return false;
+ if (a.topicName() == null) {
+ if (b.topicName() != null)
+ return false;
+ } else if (!a.topicName().equals(b.topicName())) {
+ return false;
+ }
+ if (a.option() == null) {
+ if (b.option() != null)
+ return false;
+ } else {
+ if (a.option().qos() == null) {
+ if (b.option().qos() != null)
+ return false;
+ } else if (a.option().qos().value() != b.option().qos().value()) {
+ return false;
+ }
+ if (a.option().retainHandling() == null) {
+ if (b.option().retainHandling() != null)
+ return false;
+ } else if (a.option().retainHandling().value() !=
b.option().retainHandling().value()) {
+ return false;
+ }
+ if (a.option().isRetainAsPublished() !=
b.option().isRetainAsPublished()) {
+ return false;
+ }
+ if (a.option().isNoLocal() != b.option().isNoLocal()) {
+ return false;
+ }
Review Comment:
Its a bit weird mixing single-line-ifs with-braces and without-braces in the
same method multiple times. With braces is nicer and safer. Some separating
newlines would make it a lot more readable too.
(I find it bizarre the build enforces there is a space before the brace but
also allows it not to be there hehe)
Issue Time Tracking
-------------------
Worklog Id: (was: 877918)
Time Spent: 1.5h (was: 1h 20m)
> MQTT Session States do not survive a reboot
> -------------------------------------------
>
> Key: ARTEMIS-966
> URL: https://issues.apache.org/jira/browse/ARTEMIS-966
> Project: ActiveMQ Artemis
> Issue Type: Improvement
> Components: MQTT
> Reporter: Martyn Taylor
> Priority: Major
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)