DongyuanPan commented on code in PR #131: URL: https://github.com/apache/rocketmq-mqtt/pull/131#discussion_r941311943
########## meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/RetainedMsgStateProcess.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mqtt.meta.raft.processor; + +import com.alibaba.fastjson.JSON; +import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; +import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; +import com.google.protobuf.ByteString; +import org.apache.rocketmq.mqtt.common.model.Trie; +import org.apache.rocketmq.mqtt.common.model.consistency.ReadRequest; +import org.apache.rocketmq.mqtt.common.model.consistency.Response; +import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest; +import org.apache.rocketmq.mqtt.common.util.TopicUtils; +import org.apache.rocketmq.mqtt.meta.raft.snapshot.SnapshotOperation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.BiConsumer; + + +public class RetainedMsgStateProcess extends StateProcessor { + + private static Logger logger = LoggerFactory.getLogger(RetainedMsgStateProcess.class); + private final ConcurrentHashMap<String, byte[]> retainedMsgMap = new ConcurrentHashMap<>(); //key:topic value:retained msg + private final ConcurrentHashMap<String, Trie<String, String>> retainedMsgTopicTrie = new ConcurrentHashMap<>(); //key:firstTopic value:retained topic Trie + + protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private SnapshotOperation snapshotOperation; + private int maxRetainedMessageNum ; + + public RetainedMsgStateProcess(int maxRetainedMessageNum) { + setMaxRetainedMessageNum(maxRetainedMessageNum); + } + @Override + public Response onReadRequest(ReadRequest request) { + try { + String topic = request.getExtDataMap().get("topic"); + String firstTopic = request.getExtDataMap().get("firstTopic"); + String operation = request.getOperation(); + + logger.info("FirstTopic:{} Topic:{} Operation:{}",firstTopic,topic,operation); + + if (operation.equals("topic")) { //return retained msg + byte[] msgBytes = retainedMsgMap.get(topic); + return Response.newBuilder() + .setSuccess(true) + .setData(ByteString.copyFrom(msgBytes)) + .build(); + } else { //return retain msgs of matched Topic + if (!retainedMsgTopicTrie.containsKey(firstTopic)) { + Trie<String, String> newTrie = new Trie<>(); + retainedMsgTopicTrie.put(firstTopic, newTrie); + } + Trie<String, String> tmpTrie = retainedMsgTopicTrie.get(firstTopic); + Set<String> matchTopics = tmpTrie.getAllPath(topic); + + ArrayList<byte[]> msgResults = new ArrayList<>(); + + for (String tmpTopic:matchTopics) { + byte[] msgBytes = retainedMsgMap.get(tmpTopic); + if (msgBytes != null) { + msgResults.add(msgBytes); + } + } + return Response.newBuilder() + .setSuccess(true) + .setData(ByteString.copyFrom(JSON.toJSONBytes(msgResults))) //return retained msgs of matched Topic + .build(); + } + } catch (Exception e) { + logger.error(String.valueOf(e)); + return Response.newBuilder() + .setSuccess(false) + .setErrMsg(e.getMessage()) + .build(); + } + } + + boolean setRetainedMsg(String firstTopic,String topic, boolean isEmpty,byte[] msg) { + + // if the trie of firstTopic doesn't exist + if (!retainedMsgTopicTrie.containsKey(firstTopic)) { + retainedMsgTopicTrie.put(TopicUtils.normalizeTopic(firstTopic), new Trie<String, String>()); + } + + + if (isEmpty) { + //delete from trie + logger.info("Delete the topic {} retained message", topic); + retainedMsgMap.remove(TopicUtils.normalizeTopic(topic)); + retainedMsgTopicTrie.get(TopicUtils.normalizeTopic(firstTopic)).deleteTrieNode(topic, ""); + } else { + //Add to trie + Trie<String, String> trie = retainedMsgTopicTrie.get(TopicUtils.normalizeTopic(firstTopic)); + logger.info("maxRetainedMessageNum:{}",maxRetainedMessageNum); + if (trie.getNodePath().size() < maxRetainedMessageNum) { + retainedMsgMap.put(TopicUtils.normalizeTopic(topic), msg); + retainedMsgTopicTrie.get(TopicUtils.normalizeTopic(firstTopic)).addNode(topic, "", ""); + return true; + } else { + return false; + } + } + + return true; + } + + @Override + public Response onWriteRequest(WriteRequest writeRequest) { + + try { + String firstTopic = TopicUtils.normalizeTopic(writeRequest.getExtDataMap().get("firstTopic")); //retained msg firstTopic + String topic = TopicUtils.normalizeTopic(writeRequest.getExtDataMap().get("topic")); //retained msg topic + boolean isEmpty = Boolean.parseBoolean(writeRequest.getExtDataMap().get("isEmpty")); //retained msg is empty + byte[] message = writeRequest.getData().toByteArray(); + boolean res = setRetainedMsg(firstTopic,topic, isEmpty,message); Review Comment: boolean res = setRetainedMsg(firstTopic,topic, isEmpty,message); should be format ########## mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/RetainedPersistManagerImpl.java: ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.rocketmq.mqtt.ds.meta; + + +import com.alipay.sofa.jraft.error.RemotingException; + +import org.apache.rocketmq.mqtt.common.facade.MetaPersistManager; +import org.apache.rocketmq.mqtt.common.facade.RetainedPersistManager; +import org.apache.rocketmq.mqtt.common.model.Message; +import org.apache.rocketmq.mqtt.common.model.Subscription; + +import org.apache.rocketmq.mqtt.ds.retain.RetainedMsgClient; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import javax.annotation.Resource; +import java.util.ArrayList; + + +import java.util.concurrent.CompletableFuture; + + +public class RetainedPersistManagerImpl implements RetainedPersistManager { + + private static Logger logger = LoggerFactory.getLogger(RetainedPersistManagerImpl.class); + + + @Resource + private MetaPersistManager metaPersistManager; + + public void init() { + + + } + Review Comment: too many blank lines ########## mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java: ########## @@ -51,23 +57,49 @@ public class PublishProcessor implements UpstreamProcessor { @Resource private FirstTopicManager firstTopicManager; + @Resource + RetainedPersistManager retainedPersistManager; @Override - public CompletableFuture<HookResult> process(MqttMessageUpContext context, MqttMessage mqttMessage) { + public CompletableFuture<HookResult> process(MqttMessageUpContext context, MqttMessage mqttMessage) throws RemotingException, com.alipay.sofa.jraft.error.RemotingException, ExecutionException, InterruptedException { MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage; + boolean isEmpty = false; + //deal empty payload + if (ByteBufUtil.getBytes(mqttPublishMessage.content()).length == 0) { + mqttPublishMessage = MessageUtil.dealEmptyMessage(mqttPublishMessage); + isEmpty = true; + } String msgId = MessageClientIDSetter.createUniqID(); MqttPublishVariableHeader variableHeader = mqttPublishMessage.variableHeader(); String originTopic = variableHeader.topicName(); String pubTopic = TopicUtils.normalizeTopic(originTopic); MqttTopic mqttTopic = TopicUtils.decode(pubTopic); - firstTopicManager.checkFirstTopicIfCreated(mqttTopic.getFirstTopic()); - Set<String> queueNames = wildcardManager.matchQueueSetByMsgTopic(pubTopic, context.getNamespace()); + firstTopicManager.checkFirstTopicIfCreated(mqttTopic.getFirstTopic()); // Check the firstTopic is existed + Set<String> queueNames = wildcardManager.matchQueueSetByMsgTopic(pubTopic, context.getNamespace()); //According to topic to find queue + long bornTime = System.currentTimeMillis(); + + MqttPublishMessage retainedMqttPublishMessage = mqttPublishMessage.copy(); + + if (mqttPublishMessage.fixedHeader().isRetain()) { + //Change the retained flag of message that will send MQ is 0 + mqttPublishMessage = MessageUtil.removeRetainedFlag(mqttPublishMessage); + //Keep the retained flag of message that will store meta + Message metaMessage = MessageUtil.toMessage(retainedMqttPublishMessage); + metaMessage.setMsgId(msgId); + metaMessage.setBornTimestamp(bornTime); + metaMessage.setEmpty(isEmpty); + CompletableFuture<Boolean> booleanCompletableFuture = retainedPersistManager.storeRetainedMessage(TopicUtils.normalizeTopic(metaMessage.getOriginTopic()), metaMessage); Review Comment: if store retain message error, send message action will failed. it should be notify cs and user ########## mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java: ########## @@ -51,23 +57,49 @@ public class PublishProcessor implements UpstreamProcessor { @Resource private FirstTopicManager firstTopicManager; + @Resource + RetainedPersistManager retainedPersistManager; @Override - public CompletableFuture<HookResult> process(MqttMessageUpContext context, MqttMessage mqttMessage) { + public CompletableFuture<HookResult> process(MqttMessageUpContext context, MqttMessage mqttMessage) throws RemotingException, com.alipay.sofa.jraft.error.RemotingException, ExecutionException, InterruptedException { MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage; + boolean isEmpty = false; + //deal empty payload + if (ByteBufUtil.getBytes(mqttPublishMessage.content()).length == 0) { + mqttPublishMessage = MessageUtil.dealEmptyMessage(mqttPublishMessage); + isEmpty = true; + } String msgId = MessageClientIDSetter.createUniqID(); MqttPublishVariableHeader variableHeader = mqttPublishMessage.variableHeader(); String originTopic = variableHeader.topicName(); String pubTopic = TopicUtils.normalizeTopic(originTopic); MqttTopic mqttTopic = TopicUtils.decode(pubTopic); - firstTopicManager.checkFirstTopicIfCreated(mqttTopic.getFirstTopic()); - Set<String> queueNames = wildcardManager.matchQueueSetByMsgTopic(pubTopic, context.getNamespace()); + firstTopicManager.checkFirstTopicIfCreated(mqttTopic.getFirstTopic()); // Check the firstTopic is existed + Set<String> queueNames = wildcardManager.matchQueueSetByMsgTopic(pubTopic, context.getNamespace()); //According to topic to find queue + long bornTime = System.currentTimeMillis(); + + MqttPublishMessage retainedMqttPublishMessage = mqttPublishMessage.copy(); Review Comment: it can be move to if (mqttPublishMessage.fixedHeader().isRetain()) {} body ########## mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/RetainedPersistManagerImpl.java: ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.rocketmq.mqtt.ds.meta; + + +import com.alipay.sofa.jraft.error.RemotingException; + +import org.apache.rocketmq.mqtt.common.facade.MetaPersistManager; +import org.apache.rocketmq.mqtt.common.facade.RetainedPersistManager; +import org.apache.rocketmq.mqtt.common.model.Message; +import org.apache.rocketmq.mqtt.common.model.Subscription; + +import org.apache.rocketmq.mqtt.ds.retain.RetainedMsgClient; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import javax.annotation.Resource; +import java.util.ArrayList; + + +import java.util.concurrent.CompletableFuture; + + +public class RetainedPersistManagerImpl implements RetainedPersistManager { + + private static Logger logger = LoggerFactory.getLogger(RetainedPersistManagerImpl.class); + + + @Resource + private MetaPersistManager metaPersistManager; + + public void init() { + + + } + + + + + + public CompletableFuture<Boolean> storeRetainedMessage(String topic, Message message) { + CompletableFuture<Boolean> result = new CompletableFuture<>(); + + if (!metaPersistManager.getAllFirstTopics().contains(message.getFirstTopic())) { + logger.info("Put retained message of topic {} into meta failed. Because first topic {} does not exist...", topic, message.getFirstTopic()); + result.complete(false); + return result; + } + logger.info("Start store retain msg..."); Review Comment: it should be debug log -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
