This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git


The following commit(s) were added to refs/heads/main by this push:
     new 1393642  code optimization and codeCov enhancing of 
mqtt.cs.protocol.mqtt.handler for #22 (#63)
1393642 is described below

commit 1393642c2397a72077892771883be9d5750611e5
Author: YongXing <[email protected]>
AuthorDate: Mon Apr 11 10:21:00 2022 +0800

    code optimization and codeCov enhancing of mqtt.cs.protocol.mqtt.handler 
for #22 (#63)
    
    Co-authored-by: AhaThinking <[email protected]>
---
 .../protocol/mqtt/handler/MqttConnectHandler.java  |  10 +-
 .../cs/protocol/mqtt/handler/MqttPingHandler.java  |   8 --
 .../protocol/mqtt/handler/MqttPubAckHandler.java   |   4 +-
 .../protocol/mqtt/handler/MqttPubCompHandler.java  |  12 +-
 .../protocol/mqtt/handler/MqttPubRecHandler.java   |  14 +-
 .../protocol/mqtt/handler/MqttPublishHandler.java  |  32 ++---
 .../mqtt/handler/TestMqttConnectHandler.java       | 135 ++++++++++++++++++
 .../mqtt/handler/TestMqttDisconnectHandler.java    |  62 +++++++++
 .../protocol/mqtt/handler/TestMqttPingHandler.java |  63 +++++++++
 .../mqtt/handler/TestMqttPubAckHandler.java        | 103 ++++++++++++++
 .../mqtt/handler/TestMqttPubCompHandler.java       |  92 ++++++++++++
 .../mqtt/handler/TestMqttPubRecHandler.java        | 102 ++++++++++++++
 .../mqtt/handler/TestMqttPubRelHandler.java        |  87 ++++++++++++
 .../mqtt/handler/TestMqttPublishHandler.java       | 154 +++++++++++++++++++++
 14 files changed, 816 insertions(+), 62 deletions(-)

diff --git 
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttConnectHandler.java
 
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttConnectHandler.java
index ecf3390..9a09a5d 100644
--- 
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttConnectHandler.java
+++ 
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttConnectHandler.java
@@ -33,7 +33,6 @@ import org.apache.rocketmq.mqtt.common.hook.HookResult;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelCloseFrom;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
-import org.apache.rocketmq.mqtt.cs.config.ConnectConf;
 import org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketHandler;
 import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
 import org.slf4j.Logger;
@@ -56,9 +55,6 @@ public class MqttConnectHandler implements 
MqttPacketHandler<MqttConnectMessage>
     @Resource
     private SessionLoop sessionLoop;
 
-    @Resource
-    private ConnectConf connectConf;
-
     private ScheduledThreadPoolExecutor scheduler = new 
ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("check_connect_future"));
 
     @Override
@@ -73,10 +69,6 @@ public class MqttConnectHandler implements 
MqttPacketHandler<MqttConnectMessage>
         if (!upstreamHookResult.isSuccess()) {
             byte connAckCode = (byte) upstreamHookResult.getSubCode();
             MqttConnectReturnCode mqttConnectReturnCode = 
MqttConnectReturnCode.valueOf(connAckCode);
-            if (mqttConnectReturnCode == null) {
-                channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, 
remark);
-                return;
-            }
             
channel.writeAndFlush(getMqttConnAckMessage(mqttConnectReturnCode));
             channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, 
remark);
             return;
@@ -84,6 +76,8 @@ public class MqttConnectHandler implements 
MqttPacketHandler<MqttConnectMessage>
 
         CompletableFuture<Void> future = new CompletableFuture<>();
         ChannelInfo.setFuture(channel, ChannelInfo.FUTURE_CONNECT, future);
+
+        // use 'scheduler' to separate two i/o: 'ack to client' and 
'session-load from rocketmq'
         scheduler.schedule(() -> {
             if (!future.isDone()) {
                 future.complete(null);
diff --git 
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPingHandler.java
 
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPingHandler.java
index 7fa1fbd..6a5e42d 100644
--- 
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPingHandler.java
+++ 
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPingHandler.java
@@ -17,7 +17,6 @@
 
 package org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler;
 
-
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.mqtt.MqttFixedHeader;
@@ -26,23 +25,16 @@ import io.netty.handler.codec.mqtt.MqttMessageType;
 import io.netty.handler.codec.mqtt.MqttQoS;
 import org.apache.rocketmq.mqtt.common.hook.HookResult;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
-import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
 import org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
-import javax.annotation.Resource;
-
-
 
 @Component
 public class MqttPingHandler implements MqttPacketHandler<MqttMessage> {
     private static Logger logger = 
LoggerFactory.getLogger(MqttPingHandler.class);
 
-    @Resource
-    private ChannelManager channelManager;
-
     @Override
     public void doHandler(ChannelHandlerContext ctx, MqttMessage mqttMessage, 
HookResult upstreamHookResult) {
         MqttFixedHeader mqttFixedHeader =
diff --git 
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubAckHandler.java
 
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubAckHandler.java
index ce9f2ea..77e4062 100644
--- 
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubAckHandler.java
+++ 
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubAckHandler.java
@@ -23,6 +23,7 @@ import io.netty.handler.codec.mqtt.MqttPubAckMessage;
 import org.apache.rocketmq.mqtt.common.hook.HookResult;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
 import org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketHandler;
+import org.apache.rocketmq.mqtt.cs.session.Session;
 import org.apache.rocketmq.mqtt.cs.session.infly.PushAction;
 import org.apache.rocketmq.mqtt.cs.session.infly.RetryDriver;
 import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
@@ -51,6 +52,7 @@ public class MqttPubAckHandler implements 
MqttPacketHandler<MqttPubAckMessage> {
     public void doHandler(ChannelHandlerContext ctx, MqttPubAckMessage 
mqttMessage, HookResult upstreamHookResult) {
         int messageId = mqttMessage.variableHeader().messageId();
         retryDriver.unMountPublish(messageId, 
ChannelInfo.getId(ctx.channel()));
-        
pushAction.rollNextByAck(sessionLoop.getSession(ChannelInfo.getId(ctx.channel())),
 messageId);
+        Session session = 
sessionLoop.getSession(ChannelInfo.getId(ctx.channel()));
+        pushAction.rollNextByAck(session, messageId);
     }
 }
diff --git 
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubCompHandler.java
 
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubCompHandler.java
index 27a805c..606027a 100644
--- 
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubCompHandler.java
+++ 
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubCompHandler.java
@@ -23,8 +23,7 @@ import 
io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
 import org.apache.rocketmq.mqtt.common.hook.HookResult;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
 import org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketHandler;
-import org.apache.rocketmq.mqtt.cs.session.infly.InFlyCache;
-import org.apache.rocketmq.mqtt.cs.session.infly.MqttMsgId;
+import org.apache.rocketmq.mqtt.cs.session.Session;
 import org.apache.rocketmq.mqtt.cs.session.infly.PushAction;
 import org.apache.rocketmq.mqtt.cs.session.infly.RetryDriver;
 import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
@@ -39,12 +38,6 @@ public class MqttPubCompHandler implements 
MqttPacketHandler<MqttMessage> {
     @Resource
     private RetryDriver retryDriver;
 
-    @Resource
-    private InFlyCache inFlyCache;
-
-    @Resource
-    private MqttMsgId mqttMsgId;
-
     @Resource
     private PushAction pushAction;
 
@@ -59,7 +52,8 @@ public class MqttPubCompHandler implements 
MqttPacketHandler<MqttMessage> {
         retryDriver.unMountPubRel(variableHeader.messageId(), 
ChannelInfo.getId(ctx.channel()));
 
         //The Packet Identifier becomes available for reuse once the Sender 
has received the PUBCOMP Packet.
-        pushAction.rollNextByAck(sessionLoop.getSession(channelId), 
variableHeader.messageId());
+        Session session = sessionLoop.getSession(channelId);
+        pushAction.rollNextByAck(session, variableHeader.messageId());
     }
 
 }
diff --git 
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubRecHandler.java
 
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubRecHandler.java
index 258ebf3..49e1dcb 100644
--- 
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubRecHandler.java
+++ 
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPubRecHandler.java
@@ -17,7 +17,6 @@
 
 package org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler;
 
-
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.mqtt.MqttFixedHeader;
 import io.netty.handler.codec.mqtt.MqttMessage;
@@ -27,26 +26,19 @@ import io.netty.handler.codec.mqtt.MqttQoS;
 import org.apache.rocketmq.mqtt.common.hook.HookResult;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
 import org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketHandler;
-import org.apache.rocketmq.mqtt.cs.session.infly.InFlyCache;
 import org.apache.rocketmq.mqtt.cs.session.infly.RetryDriver;
-import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
 
-
 @Component
 public class MqttPubRecHandler implements MqttPacketHandler<MqttMessage> {
+    private final MqttFixedHeader pubRelMqttFixedHeader = new 
MqttFixedHeader(MqttMessageType.PUBREL, false,
+            MqttQoS.AT_LEAST_ONCE, false, 0);
 
     @Resource
     private RetryDriver retryDriver;
 
-    @Resource
-    private InFlyCache inFlyCache;
-
-    @Resource
-    private SessionLoop sessionLoop;
-
     @Override
     public void doHandler(ChannelHandlerContext ctx, MqttMessage mqttMessage, 
HookResult upstreamHookResult) {
         MqttMessageIdVariableHeader variableHeader = 
(MqttMessageIdVariableHeader) mqttMessage.variableHeader();
@@ -54,8 +46,6 @@ public class MqttPubRecHandler implements 
MqttPacketHandler<MqttMessage> {
         retryDriver.unMountPublish(variableHeader.messageId(), channelId);
         retryDriver.mountPubRel(variableHeader.messageId(), channelId);
 
-        MqttFixedHeader pubRelMqttFixedHeader = new 
MqttFixedHeader(MqttMessageType.PUBREL, false,
-                MqttQoS.AT_LEAST_ONCE, false, 0);
         MqttMessage pubRelMqttMessage = new MqttMessage(pubRelMqttFixedHeader, 
variableHeader);
         ctx.channel().writeAndFlush(pubRelMqttMessage);
     }
diff --git 
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPublishHandler.java
 
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPublishHandler.java
index a8d17d2..c5594bc 100644
--- 
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPublishHandler.java
+++ 
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttPublishHandler.java
@@ -31,10 +31,8 @@ import org.apache.rocketmq.mqtt.common.hook.HookResult;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelCloseFrom;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
-import org.apache.rocketmq.mqtt.cs.config.ConnectConf;
 import org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketHandler;
 import org.apache.rocketmq.mqtt.cs.session.infly.InFlyCache;
-import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -51,13 +49,6 @@ public class MqttPublishHandler implements 
MqttPacketHandler<MqttPublishMessage>
     @Resource
     private ChannelManager channelManager;
 
-    @Resource
-    private SessionLoop sessionLoop;
-
-    @Resource
-    private ConnectConf connectConf;
-
-
     @Override
     public void doHandler(ChannelHandlerContext ctx,
                           MqttPublishMessage mqttMessage,
@@ -71,16 +62,11 @@ public class MqttPublishHandler implements 
MqttPacketHandler<MqttPublishMessage>
             return;
         }
 
-        final boolean isQos2Message = isQos2Message(mqttMessage);
-        if (isQos2Message) {
-            if (inFlyCache.contains(InFlyCache.CacheType.PUB, channelId, 
variableHeader.messageId())) {
-                doResponse(ctx, mqttMessage);
-                return;
-            }
-        }
         doResponse(ctx, mqttMessage);
-        if (isQos2Message) {
-            inFlyCache.put(InFlyCache.CacheType.PUB, channelId, 
variableHeader.messageId());
+
+        final boolean isQos2Message = isQos2Message(mqttMessage);
+        if (isQos2Message && !inFlyCache.contains(InFlyCache.CacheType.PUB, 
channelId, variableHeader.packetId())) {
+            inFlyCache.put(InFlyCache.CacheType.PUB, channelId, 
variableHeader.packetId());
         }
     }
 
@@ -96,19 +82,17 @@ public class MqttPublishHandler implements 
MqttPacketHandler<MqttPublishMessage>
                 break;
             case AT_LEAST_ONCE:
                 MqttFixedHeader mqttFixedHeader = new 
MqttFixedHeader(MqttMessageType.PUBACK, false,
-                    MqttQoS.AT_MOST_ONCE,
-                    false, 0);
+                    MqttQoS.AT_MOST_ONCE, false, 0);
                 MqttMessageIdVariableHeader mqttMessageIdVariableHeader = 
MqttMessageIdVariableHeader
-                    .from(variableHeader.messageId());
+                    .from(variableHeader.packetId());
                 MqttPubAckMessage pubackMessage = new 
MqttPubAckMessage(mqttFixedHeader, mqttMessageIdVariableHeader);
                 ctx.channel().writeAndFlush(pubackMessage);
                 break;
             case EXACTLY_ONCE:
                 MqttFixedHeader pubrecMqttHeader = new 
MqttFixedHeader(MqttMessageType.PUBREC, false,
-                    MqttQoS.AT_MOST_ONCE,
-                    false, 0);
+                    MqttQoS.AT_MOST_ONCE, false, 0);
                 MqttMessageIdVariableHeader pubrecMessageIdVariableHeader = 
MqttMessageIdVariableHeader
-                    .from(variableHeader.messageId());
+                    .from(variableHeader.packetId());
                 MqttMessage pubrecMqttMessage = new 
MqttMessage(pubrecMqttHeader, pubrecMessageIdVariableHeader);
                 ctx.channel().writeAndFlush(pubrecMqttMessage);
                 break;
diff --git 
a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttConnectHandler.java
 
b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttConnectHandler.java
new file mode 100644
index 0000000..4ff35ef
--- /dev/null
+++ 
b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttConnectHandler.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.test.protocol.mqtt.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.mqtt.MqttConnAckMessage;
+import io.netty.handler.codec.mqtt.MqttConnectMessage;
+import io.netty.handler.codec.mqtt.MqttConnectPayload;
+import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
+import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.mqtt.common.hook.HookResult;
+import org.apache.rocketmq.mqtt.common.model.Remark;
+import org.apache.rocketmq.mqtt.cs.channel.ChannelCloseFrom;
+import org.apache.rocketmq.mqtt.cs.channel.DefaultChannelManager;
+import org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler.MqttConnectHandler;
+import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMqttConnectHandler {
+    private MqttConnectHandler connectHandler;
+    private MqttConnectMessage connectMessage;
+
+    @Spy
+    private NioSocketChannel channel;
+
+    @Mock
+    private ChannelHandlerContext ctx;
+
+    @Mock
+    private DefaultChannelManager channelManager;
+
+    @Mock
+    private SessionLoop sessionLoop;
+
+    @Before
+    public void setUp() throws IllegalAccessException {
+        MqttFixedHeader mqttFixedHeader = new 
MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0);
+        MqttConnectVariableHeader variableHeader = new 
MqttConnectVariableHeader(null, 0, false,
+            false, false, 0, false, true, 1);
+        MqttConnectPayload payload = new MqttConnectPayload("testConnHandler", 
null, (byte[]) null, null, null);
+        connectMessage = new MqttConnectMessage(mqttFixedHeader, 
variableHeader, payload);
+
+        connectHandler = new MqttConnectHandler();
+        FieldUtils.writeDeclaredField(connectHandler, "channelManager", 
channelManager, true);
+        FieldUtils.writeDeclaredField(connectHandler, "sessionLoop", 
sessionLoop, true);
+
+        when(ctx.channel()).thenReturn(channel);
+    }
+
+    @After
+    public void After() {}
+
+    @Test
+    public void testDoHandlerAuthFailed() {
+        HookResult authFailHook = new HookResult(HookResult.FAIL,
+            
MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD.byteValue(), 
Remark.AUTH_FAILED, null);
+        doReturn(null).when(channel).writeAndFlush(any());
+        doNothing().when(channelManager).closeConnect(channel, 
ChannelCloseFrom.SERVER, Remark.AUTH_FAILED);
+
+        connectHandler.doHandler(ctx, connectMessage, authFailHook);
+
+        verify(channel).writeAndFlush(any());
+        verify(channelManager).closeConnect(channel, ChannelCloseFrom.SERVER, 
Remark.AUTH_FAILED);
+        verifyNoMoreInteractions(channelManager, sessionLoop);
+    }
+
+    @Test
+    public void testDoHandlerChannelInActive() {
+        HookResult hookResult = new HookResult(HookResult.SUCCESS, 
Remark.SUCCESS, null);
+        doReturn(false).when(channel).isActive();
+        doNothing().when(sessionLoop).loadSession(any(), any());
+
+        connectHandler.doHandler(ctx, connectMessage, hookResult);
+
+        // wait scheduler execution
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException ignored) {}
+
+        verify(sessionLoop).loadSession(any(), any());
+        verifyNoMoreInteractions(channelManager, sessionLoop);
+    }
+
+    @Test
+    public void testDoHandlerSuccess() {
+        HookResult hookResult = new HookResult(HookResult.SUCCESS, 
Remark.SUCCESS, null);
+        doReturn(true).when(channel).isActive();
+        doNothing().when(sessionLoop).loadSession(any(), any());
+
+        connectHandler.doHandler(ctx, connectMessage, hookResult);
+
+        // wait scheduler execution
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException ignored) {}
+
+        verify(channel).writeAndFlush(any(MqttConnAckMessage.class));
+        verify(sessionLoop).loadSession(any(), any());
+        verifyNoMoreInteractions(channelManager, sessionLoop);
+    }
+}
diff --git 
a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttDisconnectHandler.java
 
b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttDisconnectHandler.java
new file mode 100644
index 0000000..744dd53
--- /dev/null
+++ 
b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttDisconnectHandler.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.test.protocol.mqtt.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.mqtt.common.hook.HookResult;
+import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
+import org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler.MqttDisconnectHandler;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMqttDisconnectHandler {
+
+    private MqttDisconnectHandler disconnectHandler;
+
+    @Mock
+    private ChannelManager channelManager;
+
+    @Mock
+    private ChannelHandlerContext ctx;
+
+    @Mock
+    private MqttMessage mqttMessage;
+
+    @Mock
+    private HookResult hookResult;
+
+    @Test
+    public void testDoHandler() throws IllegalAccessException {
+        disconnectHandler = new MqttDisconnectHandler();
+        FieldUtils.writeDeclaredField(disconnectHandler, "channelManager", 
channelManager, true);
+
+        disconnectHandler.doHandler(ctx, mqttMessage, hookResult);
+        verify(channelManager).closeConnect(any(), any(), any());
+        verifyNoMoreInteractions(channelManager);
+    }
+}
\ No newline at end of file
diff --git 
a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPingHandler.java
 
b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPingHandler.java
new file mode 100644
index 0000000..3b3037c
--- /dev/null
+++ 
b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPingHandler.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.test.protocol.mqtt.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import org.apache.rocketmq.mqtt.common.hook.HookResult;
+import org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler.MqttPingHandler;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMqttPingHandler {
+
+    private MqttPingHandler pingHandler;
+
+    @Mock
+    private ChannelHandlerContext ctx;
+
+    @Mock
+    private MqttMessage mqttMessage;
+
+    @Mock
+    private HookResult hookResult;
+
+    @Spy
+    private NioSocketChannel channel;
+
+    @Test
+    public void testDoHandler() {
+        pingHandler = new MqttPingHandler();
+        when(ctx.channel()).thenReturn(channel);
+        doReturn(null).when(channel).writeAndFlush(any());
+
+        pingHandler.doHandler(ctx, mqttMessage, hookResult);
+        verify(ctx).channel();
+        verify(channel).writeAndFlush(any());
+    }
+}
diff --git 
a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubAckHandler.java
 
b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubAckHandler.java
new file mode 100644
index 0000000..646d835
--- /dev/null
+++ 
b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubAckHandler.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.test.protocol.mqtt.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttPubAckMessage;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.mqtt.common.hook.HookResult;
+import org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler.MqttPubAckHandler;
+import org.apache.rocketmq.mqtt.cs.session.Session;
+import org.apache.rocketmq.mqtt.cs.session.infly.PushAction;
+import org.apache.rocketmq.mqtt.cs.session.infly.RetryDriver;
+import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMqttPubAckHandler {
+    private MqttPubAckHandler pubAckHandler;
+    private MqttMessageIdVariableHeader variableHeader;
+    private MqttPubAckMessage pubAckMessage;
+    private MqttFixedHeader fixedHeader;
+
+    @Mock
+    private ChannelHandlerContext ctx;
+
+    @Mock
+    private HookResult hookResult;
+
+    @Mock
+    private PushAction pushAction;
+
+    @Mock
+    private RetryDriver retryDriver;
+
+    @Mock
+    private SessionLoop sessionLoop;
+
+    @Mock
+    private Session session;
+
+    @Spy
+    private NioSocketChannel channel;
+
+    @Before
+    public void setUp() throws Exception {
+        variableHeader = MqttMessageIdVariableHeader.from(007);
+        fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, 
MqttQoS.AT_MOST_ONCE, false, 0);
+        pubAckMessage = new MqttPubAckMessage(fixedHeader, variableHeader);
+    }
+
+    @Test
+    public void testDoHandler() throws IllegalAccessException {
+        pubAckHandler = new MqttPubAckHandler();
+        FieldUtils.writeDeclaredField(pubAckHandler, "pushAction", pushAction, 
true);
+        FieldUtils.writeDeclaredField(pubAckHandler, "retryDriver", 
retryDriver, true);
+        FieldUtils.writeDeclaredField(pubAckHandler, "sessionLoop", 
sessionLoop, true);
+
+        when(ctx.channel()).thenReturn(channel);
+        doReturn(null).when(retryDriver).unMountPublish(anyInt(), anyString());
+        doReturn(session).when(sessionLoop).getSession(anyString());
+        doNothing().when(pushAction).rollNextByAck(any(), anyInt());
+
+        pubAckHandler.doHandler(ctx, pubAckMessage, hookResult);
+        verify(ctx, times(2)).channel();
+        verify(sessionLoop).getSession(anyString());
+        verify(pushAction).rollNextByAck(eq(session), anyInt());
+    }
+}
diff --git 
a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubCompHandler.java
 
b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubCompHandler.java
new file mode 100644
index 0000000..a500709
--- /dev/null
+++ 
b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubCompHandler.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.test.protocol.mqtt.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.mqtt.common.hook.HookResult;
+import org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler.MqttPubCompHandler;
+import org.apache.rocketmq.mqtt.cs.session.Session;
+import org.apache.rocketmq.mqtt.cs.session.infly.PushAction;
+import org.apache.rocketmq.mqtt.cs.session.infly.RetryDriver;
+import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMqttPubCompHandler {
+    private MqttPubCompHandler pubCompHandler;
+
+    @Mock
+    private RetryDriver retryDriver;
+
+    @Mock
+    private PushAction pushAction;
+
+    @Mock
+    private SessionLoop sessionLoop;
+
+    @Mock
+    private ChannelHandlerContext ctx;
+
+    @Mock
+    private MqttMessage mqttMessage;
+
+    @Mock
+    private HookResult hookResult;
+
+    @Spy
+    private NioSocketChannel channel;
+
+    @Mock
+    private Session session;
+
+    @Test
+    public void testDoHandler() throws IllegalAccessException {
+        pubCompHandler = new MqttPubCompHandler();
+        FieldUtils.writeDeclaredField(pubCompHandler, "retryDriver", 
retryDriver, true);
+        FieldUtils.writeDeclaredField(pubCompHandler, "pushAction", 
pushAction, true);
+        FieldUtils.writeDeclaredField(pubCompHandler, "sessionLoop", 
sessionLoop, true);
+
+        MqttMessageIdVariableHeader variableHeader = 
MqttMessageIdVariableHeader.from(666);
+        when(mqttMessage.variableHeader()).thenReturn(variableHeader);
+        when(ctx.channel()).thenReturn(channel);
+        when(sessionLoop.getSession(any())).thenReturn(session);
+
+        pubCompHandler.doHandler(ctx, mqttMessage, hookResult);
+        verify(mqttMessage).variableHeader();
+        verify(ctx, times(2)).channel();
+        verify(retryDriver).unMountPubRel(anyInt(), anyString());
+        verify(sessionLoop).getSession(anyString());
+        verify(pushAction).rollNextByAck(eq(session), anyInt());
+    }
+}
diff --git 
a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubRecHandler.java
 
b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubRecHandler.java
new file mode 100644
index 0000000..c62d12d
--- /dev/null
+++ 
b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubRecHandler.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.test.protocol.mqtt.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.mqtt.common.hook.HookResult;
+import org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler.MqttPubRecHandler;
+import org.apache.rocketmq.mqtt.cs.session.infly.RetryDriver;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.lang.reflect.Field;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMqttPubRecHandler {
+
+    private MqttPubRecHandler pubRecHandler;
+
+    private final int messageId = 666;
+    private final MqttMessageIdVariableHeader variableHeader = 
MqttMessageIdVariableHeader.from(messageId);
+    private MqttFixedHeader expectedPubRelFixHeader;
+
+    @Mock
+    private RetryDriver retryDriver;
+
+    @Mock
+    private ChannelHandlerContext ctx;
+
+    @Mock
+    private MqttMessage mqttMessage;
+
+    @Mock
+    private HookResult hookResult;
+
+    @Spy
+    private NioSocketChannel channel;
+
+    @Before
+    public void setUp() throws Exception {
+        pubRecHandler = new MqttPubRecHandler();
+        FieldUtils.writeDeclaredField(pubRecHandler, "retryDriver", 
retryDriver, true);
+
+        expectedPubRelFixHeader = new MqttFixedHeader(MqttMessageType.PUBREL, 
false, MqttQoS.AT_LEAST_ONCE, false, 0);
+    }
+
+    @Test
+    public void testDoHandler() throws Exception {
+        when(ctx.channel()).thenReturn(channel);
+        when(mqttMessage.variableHeader()).thenReturn(variableHeader);
+        doReturn(null).when(channel).writeAndFlush(any(MqttMessage.class));
+
+        pubRecHandler.doHandler(ctx, mqttMessage, hookResult);
+
+        verify(ctx, times(2)).channel();
+        verify(mqttMessage).variableHeader();
+        verify(retryDriver).unMountPublish(eq(messageId), anyString());
+        verify(retryDriver).mountPubRel(eq(messageId), anyString());
+        verify(channel).writeAndFlush(any(MqttMessage.class));
+        verifyNoMoreInteractions(retryDriver, ctx, mqttMessage);
+
+        // check qosLevel of flushed pub-rel-mqtt-fixed-header
+        Field testFixedHeader = 
pubRecHandler.getClass().getDeclaredField("pubRelMqttFixedHeader");
+        testFixedHeader.setAccessible(true);
+        Assert.assertEquals(expectedPubRelFixHeader.toString(), 
testFixedHeader.get(pubRecHandler).toString());
+    }
+}
diff --git 
a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubRelHandler.java
 
b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubRelHandler.java
new file mode 100644
index 0000000..dcb3e03
--- /dev/null
+++ 
b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPubRelHandler.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.test.protocol.mqtt.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.mqtt.common.hook.HookResult;
+import org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler.MqttPubRelHandler;
+import org.apache.rocketmq.mqtt.cs.session.infly.InFlyCache;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMqttPubRelHandler {
+
+    private MqttPubRelHandler pubRelHandler;
+
+    private final int messageId = 666;
+    private final MqttMessageIdVariableHeader variableHeader = 
MqttMessageIdVariableHeader.from(messageId);
+
+    @Mock
+    private InFlyCache inFlyCache;
+
+    @Mock
+    private ChannelHandlerContext ctx;
+
+    @Mock
+    private MqttMessage mqttMessage;
+
+    @Mock
+    private HookResult hookResult;
+
+    @Spy
+    private NioSocketChannel channel;
+
+    @Before
+    public void setUp() throws Exception {
+        pubRelHandler = new MqttPubRelHandler();
+        FieldUtils.writeDeclaredField(pubRelHandler, "inFlyCache", inFlyCache, 
true);
+    }
+
+    @Test
+    public void testDoHandler() {
+        when(ctx.channel()).thenReturn(channel);
+        when(mqttMessage.variableHeader()).thenReturn(variableHeader);
+        doReturn(null).when(channel).writeAndFlush(any(MqttMessage.class));
+
+        pubRelHandler.doHandler(ctx, mqttMessage, hookResult);
+
+        verify(ctx, times(2)).channel();
+        verify(mqttMessage).variableHeader();
+        verify(inFlyCache).remove(eq(InFlyCache.CacheType.PUB), anyString(), 
eq(messageId));
+        verify(channel).writeAndFlush(any(MqttMessage.class));
+        verifyNoMoreInteractions(inFlyCache, ctx, mqttMessage, hookResult);
+    }
+}
diff --git 
a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPublishHandler.java
 
b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPublishHandler.java
new file mode 100644
index 0000000..8ab6324
--- /dev/null
+++ 
b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/protocol/mqtt/handler/TestMqttPublishHandler.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.test.protocol.mqtt.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttPubAckMessage;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.mqtt.common.hook.HookResult;
+import org.apache.rocketmq.mqtt.common.model.Remark;
+import org.apache.rocketmq.mqtt.cs.channel.ChannelCloseFrom;
+import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
+import org.apache.rocketmq.mqtt.cs.protocol.mqtt.handler.MqttPublishHandler;
+import org.apache.rocketmq.mqtt.cs.session.infly.InFlyCache;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMqttPublishHandler {
+    private final String topicName = "testMqttPub";
+    private final int packetId = 666;
+    private MqttPublishVariableHeader variableHeader = new 
MqttPublishVariableHeader(topicName, packetId);
+    private MqttPublishHandler publishHandler = new MqttPublishHandler();
+
+    private MqttFixedHeader atMostHeader;
+    private MqttFixedHeader atLeastHeader;
+    private MqttFixedHeader exactlyHeader;
+
+    private MqttPublishMessage atMostPubMessage;
+    private MqttPublishMessage atLeastPubMessage;
+    private MqttPublishMessage exactlyPubMessage;
+
+    private HookResult failHook;
+    private HookResult successHook;
+
+    @Mock
+    private InFlyCache inFlyCache;
+
+    @Mock
+    private ChannelManager channelManager;
+
+    @Mock
+    private ChannelHandlerContext ctx;
+
+    @Spy
+    private NioSocketChannel channel;
+
+    @Before
+    public void setUp() throws Exception {
+        FieldUtils.writeDeclaredField(publishHandler, "inFlyCache", 
inFlyCache, true);
+        FieldUtils.writeDeclaredField(publishHandler, "channelManager", 
channelManager, true);
+
+        atMostHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, 
MqttQoS.AT_MOST_ONCE, false, 0);
+        atMostPubMessage = new MqttPublishMessage(atMostHeader, 
variableHeader, null);
+        atLeastHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, 
MqttQoS.AT_LEAST_ONCE, false, 0);
+        atLeastPubMessage = new MqttPublishMessage(atLeastHeader, 
variableHeader, null);
+        exactlyHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, 
MqttQoS.EXACTLY_ONCE, false, 0);
+        exactlyPubMessage = new MqttPublishMessage(exactlyHeader, 
variableHeader, null);
+
+        failHook = new HookResult(HookResult.FAIL, 
MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED.byteValue(),
+                Remark.INVALID_PARAM, null);
+        successHook = new HookResult(HookResult.SUCCESS, Remark.SUCCESS, null);
+
+        when(ctx.channel()).thenReturn(channel);
+        doReturn(null).when(channel).writeAndFlush(any());
+    }
+
+    @Test
+    public void testDoHandlerHookFail() {
+        publishHandler.doHandler(ctx, atMostPubMessage, failHook);
+
+        verify(ctx).channel();
+        verify(channelManager).closeConnect(eq(channel), 
eq(ChannelCloseFrom.SERVER), eq(Remark.INVALID_PARAM));
+        verifyNoMoreInteractions(inFlyCache, channelManager, ctx);
+    }
+
+    @Test
+    public void testDoHandlerAtMostOnce() {
+        publishHandler.doHandler(ctx, atMostPubMessage, successHook);
+
+        verify(ctx).channel();
+        verifyNoMoreInteractions(inFlyCache, channelManager, ctx);
+    }
+
+    @Test
+    public void testDoHandlerAtLeastOnce() {
+        publishHandler.doHandler(ctx, atLeastPubMessage, successHook);
+
+        verify(ctx, times(2)).channel();
+        verify(channel).writeAndFlush(any(MqttPubAckMessage.class));
+        verifyNoMoreInteractions(inFlyCache, channelManager, ctx);
+    }
+
+    @Test
+    public void testDoHandlerExactlyOnceCacheHit() {
+        doReturn(true).when(inFlyCache).contains(eq(InFlyCache.CacheType.PUB), 
anyString(), eq(packetId));
+
+        publishHandler.doHandler(ctx, exactlyPubMessage, successHook);
+
+        verify(ctx, times(2)).channel();
+        verify(inFlyCache).contains(eq(InFlyCache.CacheType.PUB), anyString(), 
eq(packetId));
+        verify(channel).writeAndFlush(any(MqttMessage.class));
+        verifyNoMoreInteractions(inFlyCache, channelManager, ctx);
+    }
+
+    @Test
+    public void testDoHandlerExactlyOnceCacheNotHit() {
+        
doReturn(false).when(inFlyCache).contains(eq(InFlyCache.CacheType.PUB), 
anyString(), eq(packetId));
+
+        publishHandler.doHandler(ctx, exactlyPubMessage, successHook);
+
+        verify(ctx, times(2)).channel();
+        verify(inFlyCache).contains(eq(InFlyCache.CacheType.PUB), anyString(), 
eq(packetId));
+        verify(channel).writeAndFlush(any(MqttMessage.class));
+        verify(inFlyCache).put(eq(InFlyCache.CacheType.PUB), anyString(), 
eq(packetId));
+        verifyNoMoreInteractions(inFlyCache, channelManager, ctx);
+    }
+
+}
\ No newline at end of file

Reply via email to