This is an automated email from the ASF dual-hosted git repository.

pingww pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git


The following commit(s) were added to refs/heads/develop by this push:
     new af1cdf9  fix unit test
     new af04be9  Merge pull request #156 from DongyuanPan/develop_fix_unit_test
af1cdf9 is described below

commit af1cdf92cd0409500669d06634104f667dca7ef5
Author: dongyuan.pdy <[email protected]>
AuthorDate: Thu Oct 20 15:54:33 2022 +0800

    fix unit test
---
 .../mqtt/cs/protocol/mqtt/handler/MqttPubRecHandler.java       | 10 +++++++---
 .../java/org/apache/rocketmq/mqtt/example/MqttConsumer.java    |  8 ++++----
 .../java/org/apache/rocketmq/mqtt/example/MqttProducer.java    |  8 ++++----
 3 files changed, 15 insertions(+), 11 deletions(-)

diff --git 
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubRecHandler.java
 
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubRecHandler.java
index 0e9a7e8..b30711b 100644
--- 
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubRecHandler.java
+++ 
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubRecHandler.java
@@ -18,12 +18,14 @@
 package org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler;
 
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
 import io.netty.handler.codec.mqtt.MqttMessage;
 import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttQoS;
 import org.apache.rocketmq.mqtt.common.hook.HookResult;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
 import org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketHandler;
-import org.apache.rocketmq.mqtt.cs.protocol.mqtt.facotry.MqttMessageFactory;
 import org.apache.rocketmq.mqtt.cs.session.infly.RetryDriver;
 import org.springframework.stereotype.Component;
 
@@ -31,7 +33,8 @@ import javax.annotation.Resource;
 
 @Component
 public class MqttPubRecHandler implements MqttPacketHandler<MqttMessage> {
-
+    private final MqttFixedHeader pubRelMqttFixedHeader = new 
MqttFixedHeader(MqttMessageType.PUBREL, false,
+            MqttQoS.AT_LEAST_ONCE, false, 0);
     @Resource
     private RetryDriver retryDriver;
 
@@ -47,6 +50,7 @@ public class MqttPubRecHandler implements 
MqttPacketHandler<MqttMessage> {
         retryDriver.unMountPublish(variableHeader.messageId(), channelId);
         retryDriver.mountPubRel(variableHeader.messageId(), channelId);
 
-        
ctx.channel().writeAndFlush(MqttMessageFactory.buildPubRelMessage(variableHeader));
+        MqttMessage pubRelMqttMessage = new MqttMessage(pubRelMqttFixedHeader, 
variableHeader);
+        ctx.channel().writeAndFlush(pubRelMqttMessage);
     }
 }
diff --git 
a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttConsumer.java 
b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttConsumer.java
index 9ded30f..1c46f75 100644
--- 
a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttConsumer.java
+++ 
b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttConsumer.java
@@ -33,9 +33,9 @@ import java.util.Date;
 
 public class MqttConsumer {
     public static void main(String[] args) throws MqttException, 
NoSuchAlgorithmException, InvalidKeyException {
-        String brokerUrl = System.getenv("brokerUrl");
+        String brokerUrl = "tcp://11.164.2.4:1883";
+        String firstTopic = "dongyuan-f1";
         MemoryPersistence memoryPersistence = new MemoryPersistence();
-        String firstTopic = System.getenv("firstTopic");
         String recvClientId = "recv01";
         MqttConnectOptions mqttConnectOptions = 
buildMqttConnectOptions(recvClientId);
         MqttClient mqttClient = new MqttClient(brokerUrl, recvClientId, 
memoryPersistence);
@@ -89,8 +89,8 @@ public class MqttConsumer {
         connOpts.setKeepAliveInterval(60);
         connOpts.setAutomaticReconnect(true);
         connOpts.setMaxInflight(10000);
-        connOpts.setUserName(System.getenv("username"));
-        connOpts.setPassword(HmacSHA1Util.macSignature(clientId, 
System.getenv("secretKey")).toCharArray());
+        connOpts.setUserName("passwd");
+        connOpts.setPassword(HmacSHA1Util.macSignature(clientId, 
"passwd").toCharArray());
         return connOpts;
     }
 
diff --git 
a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttProducer.java 
b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttProducer.java
index 7b20e7a..058328b 100644
--- 
a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttProducer.java
+++ 
b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttProducer.java
@@ -35,8 +35,8 @@ import java.util.Date;
 public class MqttProducer {
     public static void main(String[] args) throws InterruptedException, 
MqttException, NoSuchAlgorithmException, InvalidKeyException {
         MemoryPersistence memoryPersistence = new MemoryPersistence();
-        String brokerUrl = System.getenv("brokerUrl");
-        String firstTopic = System.getenv("firstTopic");
+        String brokerUrl = "tcp://11.164.2.4:1883";
+        String firstTopic = "dongyuan-f1";
         String sendClientId = "send01";
         String recvClientId = "recv01";
         MqttConnectOptions mqttConnectOptions = 
buildMqttConnectOptions(sendClientId);
@@ -100,8 +100,8 @@ public class MqttProducer {
         connOpts.setKeepAliveInterval(60);
         connOpts.setAutomaticReconnect(true);
         connOpts.setMaxInflight(10000);
-        connOpts.setUserName(System.getenv("username"));
-        connOpts.setPassword(HmacSHA1Util.macSignature(clientId, 
System.getenv("secretKey")).toCharArray());
+        connOpts.setUserName("passwd");
+        connOpts.setPassword(HmacSHA1Util.macSignature(clientId, 
"passwd").toCharArray());
         return connOpts;
     }
 

Reply via email to