AR created AMQ-5390:
-----------------------
Summary: MQTT pending durable subscriber messages are not
delievered after broker restart
Key: AMQ-5390
URL: https://issues.apache.org/jira/browse/AMQ-5390
Project: ActiveMQ
Issue Type: Bug
Components: MQTT
Affects Versions: 5.11.0
Reporter: AR
If there are pending messages to be delivered to a subscriber and if the broker
is restarted at this point, the pending messages are not delivered to the
subscriber when it connects after broker restart.
I modified existing test case testReceiveMessageSentWhileOffline() and added
test case testReceiveMessageSentWhileOfflineAndBrokerRestart() shown below:
changes:
* use standalone broker as I was not sure if embedded broker persists messages
on permanent store.
* manually need to restart when test prompts to restart broker
{noformat}
@Test(timeout = 60 * 1000)
public void testReceiveMessageSentWhileOfflineAndBrokerRestart() throws
Exception {
final byte[] payload = new byte[1024 * 32];
for (int i = 0; i < payload.length; i++) {
payload[i] = '2';
}
int numberOfRuns = 100;
int messagesPerRun = 2;
final MQTT mqttPub = createMQTTConnection("MQTT-Pub-Client", true);
final MQTT mqttSub = createMQTTConnection("MQTT-Sub-Client", false);
mqttPub.setHost("tcp://localhost:1883");
mqttSub.setHost("tcp://localhost:1883");
final BlockingConnection connectionPub = mqttPub.blockingConnection();
connectionPub.connect();
BlockingConnection connectionSub = mqttSub.blockingConnection();
connectionSub.connect();
Topic[] topics = { new Topic("TopicA", QoS.EXACTLY_ONCE) };
connectionSub.subscribe(topics);
for (int i = 0; i < messagesPerRun; ++i) {
connectionPub.publish(topics[0].name().toString(), payload,
QoS.AT_LEAST_ONCE, false);
}
int received = 0;
for (int i = 0; i < messagesPerRun; ++i) {
Message message = connectionSub.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
received++;
assertTrue(Arrays.equals(payload, message.getPayload()));
message.ack();
}
connectionSub.disconnect();
for (int j = 0; j < numberOfRuns; j++) {
for (int i = 0; i < messagesPerRun; ++i) {
connectionPub.publish(topics[0].name().toString(), payload,
QoS.AT_LEAST_ONCE, false);
}
System.out.println("Restart broker here.....");
Thread.sleep(30000);
connectionSub = mqttSub.blockingConnection();
connectionSub.connect();
connectionSub.subscribe(topics);
for (int i = 0; i < messagesPerRun; ++i) {
Message message = connectionSub.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
received++;
assertTrue(Arrays.equals(payload, message.getPayload()));
message.ack();
}
connectionSub.disconnect();
}
assertEquals("Should have received " + (messagesPerRun * (numberOfRuns
+ 1)) + " messages", (messagesPerRun * (numberOfRuns + 1)), received);
}
{noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)