This is an automated email from the ASF dual-hosted git repository.
pingww pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git
The following commit(s) were added to refs/heads/develop by this push:
new af1cdf9 fix unit test
new af04be9 Merge pull request #156 from DongyuanPan/develop_fix_unit_test
af1cdf9 is described below
commit af1cdf92cd0409500669d06634104f667dca7ef5
Author: dongyuan.pdy <[email protected]>
AuthorDate: Thu Oct 20 15:54:33 2022 +0800
fix unit test
---
.../mqtt/cs/protocol/mqtt/handler/MqttPubRecHandler.java | 10 +++++++---
.../java/org/apache/rocketmq/mqtt/example/MqttConsumer.java | 8 ++++----
.../java/org/apache/rocketmq/mqtt/example/MqttProducer.java | 8 ++++----
3 files changed, 15 insertions(+), 11 deletions(-)
diff --git
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubRecHandler.java
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubRecHandler.java
index 0e9a7e8..b30711b 100644
---
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubRecHandler.java
+++
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubRecHandler.java
@@ -18,12 +18,14 @@
package org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.rocketmq.mqtt.common.hook.HookResult;
import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
import org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketHandler;
-import org.apache.rocketmq.mqtt.cs.protocol.mqtt.facotry.MqttMessageFactory;
import org.apache.rocketmq.mqtt.cs.session.infly.RetryDriver;
import org.springframework.stereotype.Component;
@@ -31,7 +33,8 @@ import javax.annotation.Resource;
@Component
public class MqttPubRecHandler implements MqttPacketHandler<MqttMessage> {
-
+ private final MqttFixedHeader pubRelMqttFixedHeader = new
MqttFixedHeader(MqttMessageType.PUBREL, false,
+ MqttQoS.AT_LEAST_ONCE, false, 0);
@Resource
private RetryDriver retryDriver;
@@ -47,6 +50,7 @@ public class MqttPubRecHandler implements
MqttPacketHandler<MqttMessage> {
retryDriver.unMountPublish(variableHeader.messageId(), channelId);
retryDriver.mountPubRel(variableHeader.messageId(), channelId);
-
ctx.channel().writeAndFlush(MqttMessageFactory.buildPubRelMessage(variableHeader));
+ MqttMessage pubRelMqttMessage = new MqttMessage(pubRelMqttFixedHeader,
variableHeader);
+ ctx.channel().writeAndFlush(pubRelMqttMessage);
}
}
diff --git
a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttConsumer.java
b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttConsumer.java
index 9ded30f..1c46f75 100644
---
a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttConsumer.java
+++
b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttConsumer.java
@@ -33,9 +33,9 @@ import java.util.Date;
public class MqttConsumer {
public static void main(String[] args) throws MqttException,
NoSuchAlgorithmException, InvalidKeyException {
- String brokerUrl = System.getenv("brokerUrl");
+ String brokerUrl = "tcp://11.164.2.4:1883";
+ String firstTopic = "dongyuan-f1";
MemoryPersistence memoryPersistence = new MemoryPersistence();
- String firstTopic = System.getenv("firstTopic");
String recvClientId = "recv01";
MqttConnectOptions mqttConnectOptions =
buildMqttConnectOptions(recvClientId);
MqttClient mqttClient = new MqttClient(brokerUrl, recvClientId,
memoryPersistence);
@@ -89,8 +89,8 @@ public class MqttConsumer {
connOpts.setKeepAliveInterval(60);
connOpts.setAutomaticReconnect(true);
connOpts.setMaxInflight(10000);
- connOpts.setUserName(System.getenv("username"));
- connOpts.setPassword(HmacSHA1Util.macSignature(clientId,
System.getenv("secretKey")).toCharArray());
+ connOpts.setUserName("passwd");
+ connOpts.setPassword(HmacSHA1Util.macSignature(clientId,
"passwd").toCharArray());
return connOpts;
}
diff --git
a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttProducer.java
b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttProducer.java
index 7b20e7a..058328b 100644
---
a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttProducer.java
+++
b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttProducer.java
@@ -35,8 +35,8 @@ import java.util.Date;
public class MqttProducer {
public static void main(String[] args) throws InterruptedException,
MqttException, NoSuchAlgorithmException, InvalidKeyException {
MemoryPersistence memoryPersistence = new MemoryPersistence();
- String brokerUrl = System.getenv("brokerUrl");
- String firstTopic = System.getenv("firstTopic");
+ String brokerUrl = "tcp://11.164.2.4:1883";
+ String firstTopic = "dongyuan-f1";
String sendClientId = "send01";
String recvClientId = "recv01";
MqttConnectOptions mqttConnectOptions =
buildMqttConnectOptions(sendClientId);
@@ -100,8 +100,8 @@ public class MqttProducer {
connOpts.setKeepAliveInterval(60);
connOpts.setAutomaticReconnect(true);
connOpts.setMaxInflight(10000);
- connOpts.setUserName(System.getenv("username"));
- connOpts.setPassword(HmacSHA1Util.macSignature(clientId,
System.getenv("secretKey")).toCharArray());
+ connOpts.setUserName("passwd");
+ connOpts.setPassword(HmacSHA1Util.macSignature(clientId,
"passwd").toCharArray());
return connOpts;
}