gtully commented on a change in pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#discussion_r761958322
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
##########
@@ -612,6 +612,45 @@ public void testMinuteSurviving() throws Exception {
Wait.assertEquals(0, queue::getConsumerCount);
}
+
+ @Test
+ public void testKilledOnNoMessagesSoCanBeRebalanced() throws Exception {
+ AddressSettings addressSettings = new AddressSettings();
+ addressSettings.setSlowConsumerCheckPeriod(2);
+
addressSettings.setSlowConsumerThresholdMeasurementUnit(MESSAGES_PER_SECOND);
+ addressSettings.setSlowConsumerThreshold(0); // if there are no messages
pending, kill me
+ addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
+
+ server.getAddressSettingsRepository().removeMatch(QUEUE.toString());
+ server.getAddressSettingsRepository().addMatch(QUEUE.toString(),
addressSettings);
+
+
+ ClientSessionFactory sf = createSessionFactory(locator);
+
+ ClientSession session = addClientSession(sf.createSession(false, true,
true, false));
+
+ ClientProducer producer =
addClientProducer(session.createProducer(QUEUE));
+
+ int messages = 1;
+
+ for (int i = 0; i < messages; i++) {
+ producer.send(session.createMessage(true));
+ }
+ session.commit();
+
+ ConcurrentHashSet<ClientMessage> receivedMessages = new
ConcurrentHashSet<>();
+ FixedRateConsumer consumer = new FixedRateConsumer(10,
MESSAGES_PER_SECOND, receivedMessages, sf, QUEUE, 0);
+ consumer.start();
+
+ Queue queue = server.locateQueue(QUEUE);
+ Wait.assertEquals(1, queue::getConsumerCount);
Review comment:
is it the locate queue that may not yet be present?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]