jbertram commented on a change in pull request #3907:
URL: https://github.com/apache/activemq-artemis/pull/3907#discussion_r792927791



##########
File path: 
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
##########
@@ -284,11 +358,110 @@ private void sendServerMessage(int messageId, 
ICoreMessage message, int delivery
             payload.writeBytes(bodyBuffer.byteBuf());
             break;
       }
-      session.getProtocolHandler().send(messageId, address, qos, isRetain, 
payload, deliveryCount);
+
+      // [MQTT-3.3.1-2] The DUP flag MUST be set to 0 for all QoS 0 messages.
+      boolean redelivery = qos == 0 ? false : (deliveryCount > 1);
+
+      boolean isRetain = message.getBooleanProperty(MQTT_MESSAGE_RETAIN_KEY);
+      MqttProperties mqttProperties = getPublishProperties(message);
+
+      if (session.is5()) {
+         if (session.getState().getSubscription(message.getAddress()) != null 
&& 
!session.getState().getSubscription(message.getAddress()).option().isRetainAsPublished())
 {
+            isRetain = false;
+         }
+
+         // [MQTT-3.8.3-3] remove property used for no-local implementation
+         message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME);
+
+         if (session.getState().getClientTopicAliasMaximum() != null) {
+            Integer alias = session.getState().getServerTopicAlias(address);
+            if (alias == null) {
+               alias = session.getState().addServerTopicAlias(address);
+               if (alias != null) {
+                  mqttProperties.add(new 
MqttProperties.IntegerProperty(TOPIC_ALIAS.value(), alias));
+               }
+            } else {
+               mqttProperties.add(new 
MqttProperties.IntegerProperty(TOPIC_ALIAS.value(), alias));
+               address = "";
+            }
+         }
+      }
+
+      int remainingLength = MQTTUtil.calculateRemainingLength(address, 
mqttProperties, payload);
+      MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, 
redelivery, MqttQoS.valueOf(qos), isRetain, remainingLength);
+      MqttPublishVariableHeader varHeader = new 
MqttPublishVariableHeader(address, messageId, mqttProperties);
+      MqttPublishMessage publish = new MqttPublishMessage(header, varHeader, 
payload);
+
+      if (session.is5()) {
+         int size = MQTTUtil.calculateMessageSize(publish);

Review comment:
       Good idea. I'll push that change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to