gemmellr commented on code in PR #4583:
URL: https://github.com/apache/activemq-artemis/pull/4583#discussion_r1301861592
##########
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java:
##########
@@ -79,20 +78,31 @@ public class MQTTProtocolManager extends
AbstractProtocolManager<MqttMessage, MQ
private final MQTTRoutingHandler routingHandler;
+ private MQTTSessionStateManager sessionStateManager;
+
MQTTProtocolManager(ActiveMQServer server,
List<BaseInterceptor> incomingInterceptors,
- List<BaseInterceptor> outgoingInterceptors) {
+ List<BaseInterceptor> outgoingInterceptors) throws
Exception {
this.server = server;
this.updateInterceptors(incomingInterceptors, outgoingInterceptors);
server.getManagementService().addNotificationListener(this);
routingHandler = new MQTTRoutingHandler(server);
+ sessionStateManager = MQTTSessionStateManager.getInstance(server);
+ server.registerActivateCallback(new CleaningActivateCallback() {
+ @Override
+ public void deActivate() {
+ MQTTSessionStateManager.removeInstance(server);
+ sessionStateManager = null;
+ }
+ });
}
public int getDefaultMqttSessionExpiryInterval() {
return defaultMqttSessionExpiryInterval;
}
public MQTTProtocolManager setDefaultMqttSessionExpiryInterval(int
sessionExpiryInterval) {
+ System.out.println("setDefaultMqttSessionExpiryInterval: " +
sessionExpiryInterval);
Review Comment:
Looks like a leftover...delete or use logging?
##########
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java:
##########
@@ -91,8 +98,50 @@ public class MQTTSessionState {
private Map<String, Integer> serverTopicAliases;
- public MQTTSessionState(String clientId) {
+ public MQTTSessionState(String clientId, MQTTSessionStateManager
stateManager) {
this.clientId = clientId;
+ this.stateManager = stateManager;
+ }
+
+ /**
+ * This constructor deserializes session data from a message. The format is
as follows.
+ *
+ * - byte: version
+ * - int: subscription count
+ *
+ * There may be 0 or more subscriptions. The subscription format is as
follows.
+ *
+ * - String: topic name
+ * - int: QoS
+ * - boolean: no-local
+ * - boolean: retain as published
+ * - int: retain handling
+ * - int (nullable): subscription identifier
Review Comment:
Curious, what is the case when a durable subscription wouldnt have an id?
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java:
##########
@@ -92,7 +92,7 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
public static Collection<Object[]> getParams() {
return Arrays.asList(new Object[][] {
{TCP},
- {WS}
+// {WS}
Review Comment:
Leftover?
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java:
##########
@@ -82,6 +83,69 @@ public void testTimestamp() throws Exception {
context.close();
}
+ @Test(timeout = DEFAULT_TIMEOUT)
+ public void testResumeSubscriptionsAfterRestart() throws Exception {
+ final int SUBSCRIPTION_COUNT = 100;
+ List<String> topicNames = new ArrayList<>(SUBSCRIPTION_COUNT);
+ for (int i = 0; i < SUBSCRIPTION_COUNT; i++) {
+ topicNames.add(RandomUtil.randomString());
Review Comment:
trace logs etc might be easier to read if just using simple 'getTestName +
i' names.
##########
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java:
##########
@@ -372,7 +374,7 @@ public static void logMessage(MQTTSessionState state,
MqttMessage message, boole
break;
}
- logger.trace(log.toString());
+ logger.info(log.toString());
Review Comment:
Seems a bit spammy, leftover debug change? EDIT Also the whole method is
wrapped in an 'isTraceEnabled' gate.
##########
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionStateManager.java:
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+
+import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MQTTSessionStateManager {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private ActiveMQServer server;
+ private final Map<String, MQTTSessionState> sessionStates = new
ConcurrentHashMap<>();
+ private final Queue sessionStore;
+ private static Map<Integer, MQTTSessionStateManager> INSTANCES = new
HashMap<>();
+
+ /*
+ * Even though there may be multiple instances of MQTTProtocolManager (e.g.
for MQTT on different ports) we only want
+ * one instance of MQTTSessionStateManager per-broker with the
understanding that there can be multiple brokers in
+ * the same JVM.
+ */
+ public static synchronized MQTTSessionStateManager
getInstance(ActiveMQServer server) throws Exception {
+ MQTTSessionStateManager instance =
INSTANCES.get(System.identityHashCode(server));
+ if (instance == null) {
+ instance = new MQTTSessionStateManager(server);
+ INSTANCES.put(System.identityHashCode(server), instance);
+ }
+
+ return instance;
+ }
+
+ public static synchronized void removeInstance(ActiveMQServer server) {
+ INSTANCES.remove(System.identityHashCode(server));
+ }
+
+ private MQTTSessionStateManager(ActiveMQServer server) throws Exception {
+ this.server = server;
+ sessionStore = server.createQueue(new
QueueConfiguration(MQTTUtil.MQTT_SESSION_STORE).setRoutingType(RoutingType.ANYCAST).setLastValue(true).setDurable(true).setInternal(true).setAutoCreateAddress(true),
true);
+
+ // load session data from queue
+ try (LinkedListIterator<MessageReference> iterator =
sessionStore.browserIterator()) {
+ try {
+ while (iterator.hasNext()) {
+ MessageReference ref = iterator.next();
+ String clientId =
ref.getMessage().getStringProperty(Message.HDR_LAST_VALUE_NAME);
+ MQTTSessionState sessionState = new
MQTTSessionState((CoreMessage) ref.getMessage(), this);
+ sessionStates.put(clientId, sessionState);
+ }
+ } catch (NoSuchElementException ignored) {
+ // this could happen through paging browsing
+ }
+ }
+ }
+
+ public void scanSessions() {
+ List<String> toRemove = new ArrayList();
+ for (Map.Entry<String, MQTTSessionState> entry :
sessionStates.entrySet()) {
+ MQTTSessionState state = entry.getValue();
+ logger.debug("Inspecting session: {}", state);
+ int sessionExpiryInterval = state.getClientSessionExpiryInterval();
+ if (!state.isAttached() && sessionExpiryInterval > 0 &&
state.getDisconnectedTime() + (sessionExpiryInterval * 1000) <
System.currentTimeMillis()) {
+ toRemove.add(entry.getKey());
+ }
+ if (state.isWill() && !state.isAttached() && state.isFailed() &&
state.getWillDelayInterval() > 0 && state.getDisconnectedTime() +
(state.getWillDelayInterval() * 1000) < System.currentTimeMillis()) {
+ state.getSession().sendWillMessage();
+ }
+ }
+
+ for (String key : toRemove) {
+ try {
+ MQTTSessionState state = removeSessionState(key);
+ if (state != null && state.isWill() && !state.isAttached() &&
state.isFailed()) {
+ state.getSession().sendWillMessage();
+ }
+ } catch (Exception e) {
+ // TODO: make this a real error message
+ e.printStackTrace();
Review Comment:
still TODO?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]