This is an automated email from the ASF dual-hosted git repository.

pingww pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git


The following commit(s) were added to refs/heads/develop by this push:
     new b09b394  Fix bug: Ensure that the QoS of Payload Messages sent in 
response to a Subscription is set to the minimum of the QoS of the original 
message and the maximum QoS granted by the Server (MQTT-3.8.4-6)
     new ae0e2f4  Merge pull request #222 from zzjcool/fix/sub-qos
b09b394 is described below

commit b09b394923c9f3f83fbb31724231b3fc19846140
Author: Chieh <[email protected]>
AuthorDate: Sat Nov 25 19:57:30 2023 +0800

    Fix bug: Ensure that the QoS of Payload Messages sent in response to a 
Subscription is set to the minimum of the QoS of the original message and the 
maximum QoS granted by the Server (MQTT-3.8.4-6)
---
 .../rocketmq/mqtt/cs/session/infly/PushAction.java |  2 +-
 .../mqtt/cs/test/session/infly/TestPushAction.java | 29 ++++++++++++++++++++++
 2 files changed, 30 insertions(+), 1 deletion(-)

diff --git 
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
 
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
index 70592a3..b85619f 100644
--- 
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
+++ 
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
@@ -109,7 +109,7 @@ public class PushAction {
         }
 
         int qos = subscription.getQos();
-        if (subscription.isP2p() && message.qos() != null) {
+        if (message.qos() != null && (subscription.isP2p() || message.qos() < 
qos)) {
             qos = message.qos();
         }
         if (qos == 0) {
diff --git 
a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/session/infly/TestPushAction.java
 
b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/session/infly/TestPushAction.java
index 5787df7..e0aaad0 100644
--- 
a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/session/infly/TestPushAction.java
+++ 
b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/session/infly/TestPushAction.java
@@ -48,6 +48,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
@@ -187,6 +188,34 @@ public class TestPushAction {
         verify(spyPushAction).rollNextByAck(any(), anyInt());
     }
 
+    @Test
+    public void testPushQos() {
+        subscription.setTopicFilter("/test/qos");
+        when(session.isClean()).thenReturn(false);
+        when(inFlyCache.getPendingDownCache()).thenReturn(new 
InFlyCache().getPendingDownCache());
+
+        PushAction spyPushAction = spy(pushAction);
+        doNothing().when(spyPushAction).write(any(), any(), anyInt(), 
anyInt(), any());
+        doNothing().when(spyPushAction).rollNextByAck(any(), anyInt());
+
+        when(message.qos()).thenReturn(1);
+        subscription.setQos(1);
+        spyPushAction.push(message, subscription, session, queue);
+        verify(spyPushAction).write(any(), any(), anyInt(), eq(1), any());
+
+        clearInvocations(spyPushAction);
+        when(message.qos()).thenReturn(1);
+        subscription.setQos(2);
+        spyPushAction.push(message, subscription, session, queue);
+        verify(spyPushAction).write(any(), any(), anyInt(), eq(1), any());
+
+        clearInvocations(spyPushAction);
+        when(message.qos()).thenReturn(3);
+        subscription.setQos(2);
+        spyPushAction.push(message, subscription, session, queue);
+        verify(spyPushAction).write(any(), any(), anyInt(), eq(2), any());
+    }
+
     @Test
     public void testWrite() {
         doReturn(channelFuture).when(channel).writeAndFlush(any());

Reply via email to