jbertram commented on a change in pull request #3907:
URL: https://github.com/apache/activemq-artemis/pull/3907#discussion_r792927791
##########
File path:
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
##########
@@ -284,11 +358,110 @@ private void sendServerMessage(int messageId,
ICoreMessage message, int delivery
payload.writeBytes(bodyBuffer.byteBuf());
break;
}
- session.getProtocolHandler().send(messageId, address, qos, isRetain,
payload, deliveryCount);
+
+ // [MQTT-3.3.1-2] The DUP flag MUST be set to 0 for all QoS 0 messages.
+ boolean redelivery = qos == 0 ? false : (deliveryCount > 1);
+
+ boolean isRetain = message.getBooleanProperty(MQTT_MESSAGE_RETAIN_KEY);
+ MqttProperties mqttProperties = getPublishProperties(message);
+
+ if (session.is5()) {
+ if (session.getState().getSubscription(message.getAddress()) != null
&&
!session.getState().getSubscription(message.getAddress()).option().isRetainAsPublished())
{
+ isRetain = false;
+ }
+
+ // [MQTT-3.8.3-3] remove property used for no-local implementation
+ message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME);
+
+ if (session.getState().getClientTopicAliasMaximum() != null) {
+ Integer alias = session.getState().getServerTopicAlias(address);
+ if (alias == null) {
+ alias = session.getState().addServerTopicAlias(address);
+ if (alias != null) {
+ mqttProperties.add(new
MqttProperties.IntegerProperty(TOPIC_ALIAS.value(), alias));
+ }
+ } else {
+ mqttProperties.add(new
MqttProperties.IntegerProperty(TOPIC_ALIAS.value(), alias));
+ address = "";
+ }
+ }
+ }
+
+ int remainingLength = MQTTUtil.calculateRemainingLength(address,
mqttProperties, payload);
+ MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH,
redelivery, MqttQoS.valueOf(qos), isRetain, remainingLength);
+ MqttPublishVariableHeader varHeader = new
MqttPublishVariableHeader(address, messageId, mqttProperties);
+ MqttPublishMessage publish = new MqttPublishMessage(header, varHeader,
payload);
+
+ if (session.is5()) {
+ int size = MQTTUtil.calculateMessageSize(publish);
Review comment:
Good idea. I'll push that change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]