[
https://issues.apache.org/jira/browse/ARTEMIS-4681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825745#comment-17825745
]
Justin Bertram edited comment on ARTEMIS-4681 at 3/12/24 5:14 PM:
------------------------------------------------------------------
[~Josh B], thanks for the test. It made diagnosing the issue much simpler. I
simplified the test a bit to this:
{code:java}
package org.apache.activemq.artemis.tests.integration.server;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import java.util.HashMap;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import
org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Before;
import org.junit.Test;
public class StuckAddressTest extends ActiveMQTestBase {
protected ActiveMQServer server;
protected ClientSession session;
protected ClientSessionFactory sf;
protected ServerLocator locator;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
server = createServer(true, createDefaultNettyConfig(),
AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES, -1,
-1, new HashMap<>());
server.start();
locator = createInVMNonHALocator();
sf = createSessionFactory(locator);
session = addClientSession(sf.createSession(false, true, true));
}
@Test
public void testConsumerKick() throws Exception {
final SimpleString addressName = SimpleString.toSimpleString("myAddress");
final int COUNT = 10;
final SimpleString divertQueue =
ResourceNames.getRetroactiveResourceQueueName(server.getConfiguration().getInternalNamingPrefix(),
server.getConfiguration().getWildcardConfiguration().getDelimiterString(),
addressName, RoutingType.MULTICAST);
server.getAddressSettingsRepository().addMatch(addressName.toString(),
new AddressSettings()
.setRetroactiveMessageCount(COUNT)
.setSlowConsumerPolicy(SlowConsumerPolicy.KILL)
.setSlowConsumerThresholdMeasurementUnit(SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_MINUTE)
.setSlowConsumerThreshold(1)
.setSlowConsumerCheckPeriod(3));
server.addAddressInfo(new
AddressInfo(addressName).addRoutingType(RoutingType.MULTICAST));
server.getConfiguration().setPersistenceEnabled(false);
ConnectionFactory cf = new
org.apache.activemq.ActiveMQConnectionFactory("failover:(tcp://localhost:61616)");
Connection c = cf.createConnection();
Session s = c.createSession();
Topic t = s.createTopic(addressName.toString());
MessageProducer producer = s.createProducer(t);
for (int i = 0; i < COUNT * 2; i++) {
Message m = s.createMessage();
m.setIntProperty("test", i);
producer.send(m);
}
producer.close();
Wait.assertTrue(() -> server.locateQueue(divertQueue).getMessageCount()
== COUNT);
MessageConsumer consumer = s.createConsumer(t);
c.start();
for (int i = 0; i < COUNT; i++) {
Message m = consumer.receive(500);
assertNotNull(m);
Thread.sleep(4_000);
assertEquals(i + COUNT, m.getIntProperty("test"));
}
assertNull(consumer.receiveNoWait());
c.close();
}
}{code}
Technically speaking the issue isn't directly related to the retroactive
address or the fact that the consumer is de-duping messages. It's caused by a
combination of the configuration and the behavior of the application. Simply
put, both {{setSlowConsumerCheckPeriod(3)}} and {{Thread.sleep(4_000)}} in the
consumer is causing the broker to identify the consumer as "slow" and close its
connection. If you just used {{setSlowConsumerCheckPeriod(5)}} *or*
{{Thread.sleep(3_000)}} then the consumer would not get disconnected.
This behavior is based on how the slow detection algorithm is implemented. It's
not terribly sophisticated so it doesn't use a rolling average or anything like
that which accounts for historical acknowledgements beyond the current time
slice. In other words, the algorithm _only_ looks at the current time slice.
For example, in the case of this test, every {{3}} seconds the broker will look
at how many messages were acknowledged _since the last time acks were checked_
(i.e. 3 seconds ago). If the number of acks during that time slice is not above
the configured threshold (i.e. 1 per minute or 0.0167 per second in this case)
then the consumer will be disconnected. Since the client is only acknowledging
messages ever 4 seconds then eventually there will be a 3 second window when
there are no acknowledgements which will cause the broker to identify the
consumer as slow and kill it.
I think the slow detection algorithm could certainly be improved, but I
wouldn't categorize this as a bug. The algorithm is working as it was designed
as far as I can tell.
was (Author: jbertram):
[~Josh B], thanks for the test. It made diagnosing the issue much simpler. I
simplified the test a bit to this:
{code:java}
package org.apache.activemq.artemis.tests.integration.server;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import java.util.HashMap;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import
org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Before;
import org.junit.Test;
public class StuckAddressTest extends ActiveMQTestBase {
protected ActiveMQServer server;
protected ClientSession session;
protected ClientSessionFactory sf;
protected ServerLocator locator;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
server = createServer(true, createDefaultNettyConfig(),
AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES, -1,
-1, new HashMap<>());
server.start();
locator = createInVMNonHALocator();
sf = createSessionFactory(locator);
session = addClientSession(sf.createSession(false, true, true));
}
@Test
public void testConsumerKick() throws Exception {
final SimpleString addressName = SimpleString.toSimpleString("myAddress");
final int COUNT = 10;
final SimpleString divertQueue =
ResourceNames.getRetroactiveResourceQueueName(server.getConfiguration().getInternalNamingPrefix(),
server.getConfiguration().getWildcardConfiguration().getDelimiterString(),
addressName, RoutingType.MULTICAST);
server.getAddressSettingsRepository().addMatch(addressName.toString(),
new AddressSettings()
.setRetroactiveMessageCount(COUNT)
.setSlowConsumerPolicy(SlowConsumerPolicy.KILL)
.setSlowConsumerThresholdMeasurementUnit(SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_MINUTE)
.setSlowConsumerThreshold(1)
.setSlowConsumerCheckPeriod(3));
server.addAddressInfo(new
AddressInfo(addressName).addRoutingType(RoutingType.MULTICAST));
server.getConfiguration().setPersistenceEnabled(false);
ConnectionFactory cf = new
org.apache.activemq.ActiveMQConnectionFactory("failover:(tcp://localhost:61616)");
Connection c = cf.createConnection();
Session s = c.createSession();
Topic t = s.createTopic(addressName.toString());
MessageProducer producer = s.createProducer(t);
for (int i = 0; i < COUNT * 2; i++) {
Message m = s.createMessage();
m.setIntProperty("test", i);
producer.send(m);
}
producer.close();
Wait.assertTrue(() -> server.locateQueue(divertQueue).getMessageCount()
== COUNT);
MessageConsumer consumer = s.createConsumer(t);
c.start();
for (int i = 0; i < COUNT; i++) {
Message m = consumer.receive(500);
assertNotNull(m);
Thread.sleep(4_000);
assertEquals(i + COUNT, m.getIntProperty("test"));
}
assertNull(consumer.receiveNoWait());
c.close();
}
}{code}
Technically speaking the issue isn't directly related to the retroactive
address. It's caused by a combination of the configuration and the behavior of
the application. Simply put, both {{setSlowConsumerCheckPeriod(3)}} and
{{Thread.sleep(4_000)}} in the consumer is causing the broker to identify the
consumer as "slow" and close its connection. If you just used
{{setSlowConsumerCheckPeriod(5)}} *or* {{Thread.sleep(3_000)}} then the
consumer would not get disconnected.
This behavior is based on how the slow detection algorithm is implemented. It's
not terribly sophisticated so it doesn't use a rolling average or anything like
that which accounts for historical acknowledgements beyond the current time
slice. In other words, the algorithm _only_ looks at the current time slice.
For example, in the case of this test, every {{3}} seconds the broker will look
at how many messages were acknowledged _since the last time acks were checked_
(i.e. 3 seconds ago). If the number of acks during that time slice is not above
the configured threshold (i.e. 1 per minute or 0.0167 per second in this case)
then the consumer will be disconnected. Since the client is only acknowledging
messages ever 4 seconds then eventually there will be a 3 second window when
there are no acknowledgements which will cause the broker to identify the
consumer as slow and kill it.
I think the slow detection algorithm could certainly be improved, but I
wouldn't categorize this as a bug. The algorithm is working as it was designed
as far as I can tell.
> Retroactive consumers get continuously kicked as a slow consumer if consumers
> dedupe messages
> ---------------------------------------------------------------------------------------------
>
> Key: ARTEMIS-4681
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4681
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Affects Versions: 2.32.0
> Reporter: Josh Byster
> Priority: Minor
>
> I noticed while running a consumer on a retroactive topic and slow consumer
> kicking with ActiveMQ Classic 5.16.5 (which has duplicate message detection)
> that the broker gets into a vicious kicking loop:
> # I connect to Retoractive.Addr, which as retroactive message count set to
> 100.
> # I receive the last 100 messages and continue to listen for new messages.
> Everything is fine at this point.
> # I disconnect or get killed due to being a slow consumer.
> # I automatically reconnect to Retoractive.Addr and, since it's retroactive,
> I receive the last 100 messages immediately.
> # I discard all of them due to having already seen them (after all, I just
> reconnected, I myself did not restart)
> # Artemis sees this as a consumer that has not acked any messages (since they
> were discarded) and immediately kills it.
> # The cycle continues…
> It seems retroactive consumers are doing what it should, message
> de-duplication client-side also working as expected, but not sure if slow
> consumer kicking should be disabled for retroactive topics due to this. I
> have not verified this against an Artemis client, just against a Classic
> client where I've observed this behavior.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)