This is an automated email from the ASF dual-hosted git repository.
pingww pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git
The following commit(s) were added to refs/heads/develop by this push:
new b09b394 Fix bug: Ensure that the QoS of Payload Messages sent in
response to a Subscription is set to the minimum of the QoS of the original
message and the maximum QoS granted by the Server (MQTT-3.8.4-6)
new ae0e2f4 Merge pull request #222 from zzjcool/fix/sub-qos
b09b394 is described below
commit b09b394923c9f3f83fbb31724231b3fc19846140
Author: Chieh <[email protected]>
AuthorDate: Sat Nov 25 19:57:30 2023 +0800
Fix bug: Ensure that the QoS of Payload Messages sent in response to a
Subscription is set to the minimum of the QoS of the original message and the
maximum QoS granted by the Server (MQTT-3.8.4-6)
---
.../rocketmq/mqtt/cs/session/infly/PushAction.java | 2 +-
.../mqtt/cs/test/session/infly/TestPushAction.java | 29 ++++++++++++++++++++++
2 files changed, 30 insertions(+), 1 deletion(-)
diff --git
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
index 70592a3..b85619f 100644
---
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
+++
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
@@ -109,7 +109,7 @@ public class PushAction {
}
int qos = subscription.getQos();
- if (subscription.isP2p() && message.qos() != null) {
+ if (message.qos() != null && (subscription.isP2p() || message.qos() <
qos)) {
qos = message.qos();
}
if (qos == 0) {
diff --git
a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/session/infly/TestPushAction.java
b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/session/infly/TestPushAction.java
index 5787df7..e0aaad0 100644
---
a/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/session/infly/TestPushAction.java
+++
b/mqtt-cs/src/test/java/org/apache/rocketmq/mqtt/cs/test/session/infly/TestPushAction.java
@@ -48,6 +48,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
@@ -187,6 +188,34 @@ public class TestPushAction {
verify(spyPushAction).rollNextByAck(any(), anyInt());
}
+ @Test
+ public void testPushQos() {
+ subscription.setTopicFilter("/test/qos");
+ when(session.isClean()).thenReturn(false);
+ when(inFlyCache.getPendingDownCache()).thenReturn(new
InFlyCache().getPendingDownCache());
+
+ PushAction spyPushAction = spy(pushAction);
+ doNothing().when(spyPushAction).write(any(), any(), anyInt(),
anyInt(), any());
+ doNothing().when(spyPushAction).rollNextByAck(any(), anyInt());
+
+ when(message.qos()).thenReturn(1);
+ subscription.setQos(1);
+ spyPushAction.push(message, subscription, session, queue);
+ verify(spyPushAction).write(any(), any(), anyInt(), eq(1), any());
+
+ clearInvocations(spyPushAction);
+ when(message.qos()).thenReturn(1);
+ subscription.setQos(2);
+ spyPushAction.push(message, subscription, session, queue);
+ verify(spyPushAction).write(any(), any(), anyInt(), eq(1), any());
+
+ clearInvocations(spyPushAction);
+ when(message.qos()).thenReturn(3);
+ subscription.setQos(2);
+ spyPushAction.push(message, subscription, session, queue);
+ verify(spyPushAction).write(any(), any(), anyInt(), eq(2), any());
+ }
+
@Test
public void testWrite() {
doReturn(channelFuture).when(channel).writeAndFlush(any());