[ 
https://issues.apache.org/jira/browse/ARTEMIS-5251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17916483#comment-17916483
 ] 

Justin Bertram commented on ARTEMIS-5251:
-----------------------------------------

The problem here is that you're sending a _Core_ message directly to the _Core_ 
address. This effectively bypasses the logic to deal with MQTT retained 
messages which is encapsulated in the MQTT protocol implementation classes. The 
Core message is routed to the proper queues which is ostensibly why your MQTT 
subscriber receives it, but it is not "retained" for later dispatch to future 
MQTT subscribers. Retained messages are handled by 
{{org.apache.activemq.artemis.core.protocol.mqtt.MQTTRetainMessageManager}} 
which is tied to the corresponding MQTT session.

> the consumer after restart it does not receive the retained message
> -------------------------------------------------------------------
>
>                 Key: ARTEMIS-5251
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-5251
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>    Affects Versions: 2.36.0
>            Reporter: gongping.zhu
>            Priority: Major
>
> When a device connected I send a retained connnect message in the plugin. The 
> consumer does receive the message with the retained flag. However, when I 
> restart the consumer it does not receive the retained message. Why is this? 
> Below is the code of my plugin.
> {code:java}
> ByteBuf payloadByteBuf = 
> Unpooled.wrappedBuffer(payload.getBytes(StandardCharsets.UTF_8));
> SimpleString address = SimpleString.of(topic);
> CoreMessage message = new 
> CoreMessage().initBuffer(payloadByteBuf.readableBytes()).setMessageID(serverRefRef.get().getStorageManager().generateID());
> message.putBooleanProperty(MQTTUtil.MQTT_MESSAGE_RETAIN_INITIAL_DISTRIBUTION_KEY,true);
> message.putBooleanProperty(MQTTUtil.MQTT_MESSAGE_RETAIN_KEY, true);
> message.putIntProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY, qosLevel);
> message.putStringProperty(STATUS_FROM, from);
> message.setType(Message.BYTES_TYPE);
> message.setRoutingType(RoutingType.MULTICAST);
> message.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME, clientId);
> AddressInfo addressInfo = serverRefRef.get().getAddressInfo(address);
> if (addressInfo != null) {
> message.setRoutingType(addressInfo.getRoutingType());
> }
> message.setAddress(address);
> message.getBodyBuffer().writeBytes(payloadByteBuf, 0, 
> payloadByteBuf.readableBytes());
> session.send(tx,message,true,name,false);{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to