This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop_mqtt5.0
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git
The following commit(s) were added to refs/heads/develop_mqtt5.0 by this push:
new 5397e58 Support some simple feature (#254)
5397e58 is described below
commit 5397e58cb7e2831af52a5725b6c4bfd682da7a09
Author: Dongyuan Pan <[email protected]>
AuthorDate: Thu May 23 10:29:09 2024 +0800
Support some simple feature (#254)
* Support user properties
* Supoort ContentType and payloadFormatIndicator
* add CheckPacket
* delete check packet
* fix bug: reset topic alias
* fix bug: reset topic alias
* support SUBSCRIPTION_IDENTIFIER RESPONSE_TOPIC CORRELATION_DATA
CONTENT_TYPE
---
.../rocketmq/mqtt/common/util/MessageUtil.java | 2 +-
.../mqtt/common/test/util/TestMessageUtil.java | 57 +++++++++++
.../rocketmq/mqtt/cs/session/infly/PushAction.java | 104 ++++++++++++++++-----
.../mqtt5/processor/PublishProcessor5.java | 1 +
.../rocketmq/mqtt/example/mqtt5/Mqtt5Consumer.java | 2 +-
.../rocketmq/mqtt/example/mqtt5/Mqtt5Producer.java | 15 +++
6 files changed, 157 insertions(+), 24 deletions(-)
diff --git
a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/MessageUtil.java
b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/MessageUtil.java
index 5362445..6b86468 100644
---
a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/MessageUtil.java
+++
b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/MessageUtil.java
@@ -108,7 +108,7 @@ public class MessageUtil {
message.putUserProperty(Message.propertyCorrelationData, new
String(correlationData.value(), StandardCharsets.UTF_8));
}
- // User Properties
+ // The user properties of publish packets need to be stored, and
when pushing, they need to be brought with them
List<MqttProperties.UserProperty> userProperties =
(List<MqttProperties.UserProperty>)
mqttProperties.getProperties(USER_PROPERTY.value());
List<MqttProperties.StringPair> userPropertyList = new
ArrayList<>();
for (MqttProperties.UserProperty userProperty : userProperties) {
diff --git
a/mqtt-common/src/test/java/org/apache/rocketmq/mqtt/common/test/util/TestMessageUtil.java
b/mqtt-common/src/test/java/org/apache/rocketmq/mqtt/common/test/util/TestMessageUtil.java
index 7318210..6788dbd 100644
---
a/mqtt-common/src/test/java/org/apache/rocketmq/mqtt/common/test/util/TestMessageUtil.java
+++
b/mqtt-common/src/test/java/org/apache/rocketmq/mqtt/common/test/util/TestMessageUtil.java
@@ -17,29 +17,50 @@
package org.apache.rocketmq.mqtt.common.test.util;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.TypeReference;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.CharsetUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.mqtt.common.model.Message;
import org.apache.rocketmq.mqtt.common.util.MessageUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+import static
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CONTENT_TYPE;
+import static
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CORRELATION_DATA;
+import static
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.PAYLOAD_FORMAT_INDICATOR;
+import static
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL;
+import static
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.RESPONSE_TOPIC;
+import static
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER;
+import static
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.TOPIC_ALIAS;
+import static
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.USER_PROPERTY;
import static org.apache.rocketmq.mqtt.common.util.MessageUtil.EMPTYSTRING;
import static
org.apache.rocketmq.mqtt.common.util.MessageUtil.dealEmptyMessage;
import static
org.apache.rocketmq.mqtt.common.util.MessageUtil.removeRetainedFlag;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
public class TestMessageUtil {
@@ -119,4 +140,40 @@ public class TestMessageUtil {
newEmptyMessage.payload().readBytes(newBody);
Assert.assertArrayEquals(EMPTYSTRING.getBytes(), newBody);
}
+
+ @Test
+ public void TestMqtt5Message() {
+
+ MqttProperties props = new MqttProperties();
+ props.add(new MqttProperties.UserProperty("isSecret", "true"));
+ props.add(new MqttProperties.UserProperty("tag", "firstTag"));
+ props.add(new MqttProperties.UserProperty("tag", "secondTag"));
+
+ props.add(new
MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(), 100));
+ props.add(new
MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(), 101));
+
+ MqttFixedHeader mqttFixedHeader = new
MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false,
1);
+ MqttPublishVariableHeader variableHeader = new
MqttPublishVariableHeader("test", 0, props);
+ ByteBuf payload =
Unpooled.copiedBuffer("test".getBytes(StandardCharsets.UTF_8));
+ MqttPublishMessage publishMessage = new
MqttPublishMessage(mqttFixedHeader, variableHeader, payload);
+ Message message = MessageUtil.toMessage(publishMessage);
+
+ String mqtt5UserProperties =
message.getUserProperty(Message.propertyMqtt5UserProperty);
+ MqttPublishMessage newPublishMessage = null;
+
+ if (StringUtils.isNotBlank(mqtt5UserProperties)) {
+ ArrayList<MqttProperties.StringPair> userProperties =
JSON.parseObject(mqtt5UserProperties,
+ new TypeReference<ArrayList<MqttProperties.StringPair>>()
{}
+ );
+ MqttProperties newProps = new MqttProperties();
+ newProps.add(new MqttProperties.UserProperties(userProperties));
+ MqttPublishVariableHeader newVariableHeader = new
MqttPublishVariableHeader("test", 0, props);
+ newPublishMessage = new MqttPublishMessage(mqttFixedHeader,
variableHeader, payload);
+ }
+
+ MqttProperties checkProps =
newPublishMessage.variableHeader().properties();
+ Assert.assertEquals("true",
((MqttProperties.StringPair)checkProps.getProperties(USER_PROPERTY.value()).get(0).value()).value);
+ Assert.assertEquals("firstTag",
((MqttProperties.StringPair)checkProps.getProperties(USER_PROPERTY.value()).get(1).value()).value);
+ Assert.assertEquals("secondTag",
((MqttProperties.StringPair)checkProps.getProperties(USER_PROPERTY.value()).get(2).value()).value);
+ }
}
diff --git
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
index ca044ac..feccc55 100644
---
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
+++
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
@@ -17,10 +17,14 @@
package org.apache.rocketmq.mqtt.cs.session.infly;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.TypeReference;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
+import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.codec.mqtt.MqttProperties;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttVersion;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MixAll;
@@ -39,8 +43,13 @@ import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
+import java.util.ArrayList;
import java.util.List;
+import static
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CONTENT_TYPE;
+import static
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CORRELATION_DATA;
+import static
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.RESPONSE_TOPIC;
+import static
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER;
import static
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.TOPIC_ALIAS;
import static java.lang.Math.min;
import static java.util.Objects.hash;
@@ -95,9 +104,9 @@ public class PushAction {
try {
if (session.isClean()) {
if (message.getStoreTimestamp() > 0 &&
- message.getStoreTimestamp() < session.getStartTime()) {
+ message.getStoreTimestamp() < session.getStartTime()) {
logger.warn("old msg:{},{},{},{}", session.getClientId(),
message.getMsgId(),
- message.getStoreTimestamp(), session.getStartTime());
+ message.getStoreTimestamp(),
session.getStartTime());
rollNext(session, mqttId);
return;
}
@@ -172,28 +181,37 @@ public class PushAction {
data = MqttMessageFactory.buildPublishMessage(topicName,
message.getPayload(), qos, retained, mqttId);
break;
case MQTT_5:
+ // add content type
+ if
(StringUtils.isNotBlank(message.getUserProperty(Message.propertyContentType))) {
+ mqttProperties.add(new
MqttProperties.StringProperty(CONTENT_TYPE.value(),
message.getUserProperty(Message.propertyContentType)));
+ }
+
+ // add Response Topic
+ if
(StringUtils.isNotBlank(message.getUserProperty(Message.propertyResponseTopic)))
{
+ mqttProperties.add(new
MqttProperties.StringProperty(RESPONSE_TOPIC.value(),
message.getUserProperty(Message.propertyResponseTopic)));
+ }
+
+ // add Correlation Data
+ if
(StringUtils.isNotBlank(message.getUserProperty(Message.propertyCorrelationData)))
{
+ mqttProperties.add(new
MqttProperties.StringProperty(CORRELATION_DATA.value(),
message.getUserProperty(Message.propertyCorrelationData)));
+ }
+
+ // process publish user properties
+ processUserProperties(message, mqttProperties);
+
+ // process subscription identifier
+ if (subscription.getSubscriptionIdentifier() > 0) {
+ mqttProperties.add(new
MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(),
subscription.getSubscriptionIdentifier()));
+ }
+
// TODO retain flag should be set by subscription option
- int topicAlias = ChannelInfo.getTopicAliasMaximum(channel);
- if (topicAlias > 0) {
- String topicNameTmp = "";
- if (ChannelInfo.getServerTopicAlias(channel, topicName) ==
null) {
- // allocate topic alias
- int allocateAlias = genServerTopicAlias(topicName,
topicAlias);
-
- if (ChannelInfo.getServerAliasTopic(channel,
allocateAlias) != null) {
- // conflict, reset topic <-> alias
- topicNameTmp = topicName;
- }
-
- ChannelInfo.setServerTopicAlias(channel, topicName,
allocateAlias);
- ChannelInfo.setServerAliasTopic(channel,
allocateAlias, topicName);
- }
+ boolean isRetained = message.isRetained();
- mqttProperties.add(new
MqttProperties.IntegerProperty(TOPIC_ALIAS.value(),
ChannelInfo.getServerTopicAlias(channel, topicName)));
- data =
MqttMessageFactory.buildMqtt5PublishMessage(topicNameTmp, message.getPayload(),
qos, retained, mqttId, mqttProperties);
+ // process topic alias
+ if (!processTopicAlias(channel, topicName, mqttProperties)) {
+ data = MqttMessageFactory.buildMqtt5PublishMessage("",
message.getPayload(), qos, isRetained, mqttId, mqttProperties);
} else {
- // no alias
- data =
MqttMessageFactory.buildMqtt5PublishMessage(topicName, message.getPayload(),
qos, retained, mqttId, mqttProperties);
+ data =
MqttMessageFactory.buildMqtt5PublishMessage(topicName, message.getPayload(),
qos, isRetained, mqttId, mqttProperties);
}
break;
default:
@@ -207,12 +225,54 @@ public class PushAction {
message.setRetry(message.getRetry() + 1);
logger.warn("retryPush:{},{},{}", session.getClientId(),
message.getMsgId(), message.getRetry());
} else if (subscription.isShare()) {
- String lmqTopic = MixAll.LMQ_PREFIX +
StringUtils.replace(message.getOriginTopic(), "/","%");
+ String lmqTopic = MixAll.LMQ_PREFIX +
StringUtils.replace(message.getOriginTopic(), "/", "%");
lmqQueueStore.popAck(lmqTopic, subscription.getSharedName(),
message);
}
});
}
+ public void processUserProperties(Message message, MqttProperties
mqttProperties) {
+ String mqtt5UserProperties =
message.getUserProperty(Message.propertyMqtt5UserProperty);
+ if (StringUtils.isNotBlank(mqtt5UserProperties)) {
+ ArrayList<MqttProperties.StringPair> userProperties =
JSON.parseObject(mqtt5UserProperties,
+ new TypeReference<ArrayList<MqttProperties.StringPair>>() {
+ }
+ );
+ mqttProperties.add(new
MqttProperties.UserProperties(userProperties));
+ }
+ }
+
+ /**
+ * process topic alias
+ * @param channel
+ * @param topicName
+ * @param mqttProperties
+ * @return true: conflict when allocated topic alias
+ */
+ public boolean processTopicAlias(Channel channel, String topicName,
MqttProperties mqttProperties) {
+ int topicAlias = ChannelInfo.getTopicAliasMaximum(channel);
+ boolean conflict = false;
+
+ if (topicAlias > 0) {
+ if (ChannelInfo.getServerTopicAlias(channel, topicName) == null) {
+ // allocate topic alias
+ int allocateAlias = genServerTopicAlias(topicName, topicAlias);
+
+ if (ChannelInfo.getServerAliasTopic(channel, allocateAlias) !=
null) {
+ // conflict, client will reset topic <-> alias
+ conflict = true;
+ }
+
+ ChannelInfo.setServerTopicAlias(channel, topicName,
allocateAlias);
+ ChannelInfo.setServerAliasTopic(channel, allocateAlias,
topicName);
+ }
+
+ // topic has allocated topic alias,just set to mqttProperties
+ mqttProperties.add(new
MqttProperties.IntegerProperty(TOPIC_ALIAS.value(),
ChannelInfo.getServerTopicAlias(channel, topicName)));
+ }
+ return conflict;
+ }
+
public int genServerTopicAlias(String topicName, int topicAliasMaximum) {
return hash(topicName) % topicAliasMaximum + 1;
}
diff --git
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/mqtt5/processor/PublishProcessor5.java
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/mqtt5/processor/PublishProcessor5.java
index 2cdee66..4c3bb89 100644
---
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/mqtt5/processor/PublishProcessor5.java
+++
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/mqtt5/processor/PublishProcessor5.java
@@ -72,6 +72,7 @@ public class PublishProcessor5 implements UpstreamProcessor5 {
public CompletableFuture<StoreResult> put(MqttMessageUpContext context,
MqttMessage mqttMessage) {
MqttPublishMessage mqttPublishMessage = (MqttPublishMessage)
mqttMessage;
+ // process topic alias
final MqttPublishVariableHeader variableHeaderTmp =
mqttPublishMessage.variableHeader();
MqttProperties mqttProperties = variableHeaderTmp.properties();
if (mqttProperties != null && context.getClientTopicAliasMap() !=
null) {
diff --git
a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/mqtt5/Mqtt5Consumer.java
b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/mqtt5/Mqtt5Consumer.java
index f40dd94..c139d40 100644
---
a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/mqtt5/Mqtt5Consumer.java
+++
b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/mqtt5/Mqtt5Consumer.java
@@ -62,7 +62,7 @@ public class Mqtt5Consumer {
try {
String payload = new String(message.getPayload());
String[] ss = payload.split("_");
- System.out.println(now() + "receive:" + topic + "," +
payload);
+ System.out.println(now() + "receive:" + topic + "," +
payload + ", properties: " + message.getProperties());
} catch (Exception e) {
e.printStackTrace();
}
diff --git
a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/mqtt5/Mqtt5Producer.java
b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/mqtt5/Mqtt5Producer.java
index 4968f9e..4ae1876 100644
---
a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/mqtt5/Mqtt5Producer.java
+++
b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/mqtt5/Mqtt5Producer.java
@@ -27,12 +27,15 @@ import
org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
+import org.eclipse.paho.mqttv5.common.packet.UserProperty;
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
import java.util.Date;
+import java.util.List;
public class Mqtt5Producer {
public static void main(String[] args) throws InterruptedException,
MqttException, NoSuchAlgorithmException, InvalidKeyException {
@@ -82,6 +85,18 @@ public class Mqtt5Producer {
for (int i = 0; i < 1000; i++) {
String msg = "r1_" + System.currentTimeMillis() + "_" + i;
MqttMessage message = new
MqttMessage(msg.getBytes(StandardCharsets.UTF_8));
+
+ // user properties
+ MqttProperties mqttProperties = new MqttProperties();
+ List<UserProperty> userProperties = new ArrayList<>();
+ userProperties.add(new UserProperty("tag", "r1"));
+ userProperties.add(new UserProperty("tag", "r11"));
+ mqttProperties.setUserProperties(userProperties);
+
+ // content type
+ mqttProperties.setContentType("text/plain");
+ message.setProperties(mqttProperties);
+
message.setQos(1);
String mqttSendTopic = firstTopic + "/r1";
mqttClient.publish(mqttSendTopic, message);