[ 
https://issues.apache.org/jira/browse/ARTEMIS-4681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825745#comment-17825745
 ] 

Justin Bertram edited comment on ARTEMIS-4681 at 3/12/24 5:14 PM:
------------------------------------------------------------------

[~Josh B], thanks for the test. It made diagnosing the issue much simpler. I 
simplified the test a bit to this:
{code:java}
package org.apache.activemq.artemis.tests.integration.server;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import java.util.HashMap;

import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import 
org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Before;
import org.junit.Test;

public class StuckAddressTest extends ActiveMQTestBase {

   protected ActiveMQServer server;

   protected ClientSession session;

   protected ClientSessionFactory sf;

   protected ServerLocator locator;

   @Override
   @Before
   public void setUp() throws Exception {
      super.setUp();
      server = createServer(true, createDefaultNettyConfig(), 
AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES, -1, 
-1, new HashMap<>());
      server.start();
      locator = createInVMNonHALocator();
      sf = createSessionFactory(locator);
      session = addClientSession(sf.createSession(false, true, true));
   }

   @Test
   public void testConsumerKick() throws Exception {
      final SimpleString addressName = SimpleString.toSimpleString("myAddress");
      final int COUNT = 10;
      final SimpleString divertQueue = 
ResourceNames.getRetroactiveResourceQueueName(server.getConfiguration().getInternalNamingPrefix(),
 server.getConfiguration().getWildcardConfiguration().getDelimiterString(), 
addressName, RoutingType.MULTICAST);
      server.getAddressSettingsRepository().addMatch(addressName.toString(), 
new AddressSettings()
         .setRetroactiveMessageCount(COUNT)
         .setSlowConsumerPolicy(SlowConsumerPolicy.KILL)
         
.setSlowConsumerThresholdMeasurementUnit(SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_MINUTE)
         .setSlowConsumerThreshold(1)
         .setSlowConsumerCheckPeriod(3));
      server.addAddressInfo(new 
AddressInfo(addressName).addRoutingType(RoutingType.MULTICAST));
      server.getConfiguration().setPersistenceEnabled(false);

      ConnectionFactory cf = new 
org.apache.activemq.ActiveMQConnectionFactory("failover:(tcp://localhost:61616)");

      Connection c = cf.createConnection();
      Session s = c.createSession();
      Topic t = s.createTopic(addressName.toString());

      MessageProducer producer = s.createProducer(t);
      for (int i = 0; i < COUNT * 2; i++) {
         Message m = s.createMessage();
         m.setIntProperty("test", i);
         producer.send(m);
      }
      producer.close();
      Wait.assertTrue(() -> server.locateQueue(divertQueue).getMessageCount() 
== COUNT);

      MessageConsumer consumer = s.createConsumer(t);
      c.start();
      for (int i = 0; i < COUNT; i++) {
         Message m = consumer.receive(500);
         assertNotNull(m);
         Thread.sleep(4_000);
         assertEquals(i + COUNT, m.getIntProperty("test"));
      }
      assertNull(consumer.receiveNoWait());
      c.close();
   }
}{code}
Technically speaking the issue isn't directly related to the retroactive 
address or the fact that the consumer is de-duping messages. It's caused by a 
combination of the configuration and the behavior of the application. Simply 
put, both {{setSlowConsumerCheckPeriod(3)}} and {{Thread.sleep(4_000)}} in the 
consumer is causing the broker to identify the consumer as "slow" and close its 
connection. If you just used {{setSlowConsumerCheckPeriod(5)}} *or* 
{{Thread.sleep(3_000)}} then the consumer would not get disconnected.

This behavior is based on how the slow detection algorithm is implemented. It's 
not terribly sophisticated so it doesn't use a rolling average or anything like 
that which accounts for historical acknowledgements beyond the current time 
slice. In other words, the algorithm _only_ looks at the current time slice.

For example, in the case of this test, every {{3}} seconds the broker will look 
at how many messages were acknowledged _since the last time acks were checked_ 
(i.e. 3 seconds ago). If the number of acks during that time slice is not above 
the configured threshold (i.e. 1 per minute or 0.0167 per second in this case) 
then the consumer will be disconnected. Since the client is only acknowledging 
messages ever 4 seconds then eventually there will be a 3 second window when 
there are no acknowledgements which will cause the broker to identify the 
consumer as slow and kill it.

I think the slow detection algorithm could certainly be improved, but I 
wouldn't categorize this as a bug. The algorithm is working as it was designed 
as far as I can tell.


was (Author: jbertram):
[~Josh B], thanks for the test. It made diagnosing the issue much simpler. I 
simplified the test a bit to this:
{code:java}
package org.apache.activemq.artemis.tests.integration.server;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import java.util.HashMap;

import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import 
org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Before;
import org.junit.Test;

public class StuckAddressTest extends ActiveMQTestBase {

   protected ActiveMQServer server;

   protected ClientSession session;

   protected ClientSessionFactory sf;

   protected ServerLocator locator;

   @Override
   @Before
   public void setUp() throws Exception {
      super.setUp();
      server = createServer(true, createDefaultNettyConfig(), 
AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES, -1, 
-1, new HashMap<>());
      server.start();
      locator = createInVMNonHALocator();
      sf = createSessionFactory(locator);
      session = addClientSession(sf.createSession(false, true, true));
   }

   @Test
   public void testConsumerKick() throws Exception {
      final SimpleString addressName = SimpleString.toSimpleString("myAddress");
      final int COUNT = 10;
      final SimpleString divertQueue = 
ResourceNames.getRetroactiveResourceQueueName(server.getConfiguration().getInternalNamingPrefix(),
 server.getConfiguration().getWildcardConfiguration().getDelimiterString(), 
addressName, RoutingType.MULTICAST);
      server.getAddressSettingsRepository().addMatch(addressName.toString(), 
new AddressSettings()
         .setRetroactiveMessageCount(COUNT)
         .setSlowConsumerPolicy(SlowConsumerPolicy.KILL)
         
.setSlowConsumerThresholdMeasurementUnit(SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_MINUTE)
         .setSlowConsumerThreshold(1)
         .setSlowConsumerCheckPeriod(3));
      server.addAddressInfo(new 
AddressInfo(addressName).addRoutingType(RoutingType.MULTICAST));
      server.getConfiguration().setPersistenceEnabled(false);

      ConnectionFactory cf = new 
org.apache.activemq.ActiveMQConnectionFactory("failover:(tcp://localhost:61616)");

      Connection c = cf.createConnection();
      Session s = c.createSession();
      Topic t = s.createTopic(addressName.toString());

      MessageProducer producer = s.createProducer(t);
      for (int i = 0; i < COUNT * 2; i++) {
         Message m = s.createMessage();
         m.setIntProperty("test", i);
         producer.send(m);
      }
      producer.close();
      Wait.assertTrue(() -> server.locateQueue(divertQueue).getMessageCount() 
== COUNT);

      MessageConsumer consumer = s.createConsumer(t);
      c.start();
      for (int i = 0; i < COUNT; i++) {
         Message m = consumer.receive(500);
         assertNotNull(m);
         Thread.sleep(4_000);
         assertEquals(i + COUNT, m.getIntProperty("test"));
      }
      assertNull(consumer.receiveNoWait());
      c.close();
   }
}{code}
Technically speaking the issue isn't directly related to the retroactive 
address. It's caused by a combination of the configuration and the behavior of 
the application. Simply put, both {{setSlowConsumerCheckPeriod(3)}} and 
{{Thread.sleep(4_000)}} in the consumer is causing the broker to identify the 
consumer as "slow" and close its connection. If you just used 
{{setSlowConsumerCheckPeriod(5)}} *or* {{Thread.sleep(3_000)}} then the 
consumer would not get disconnected.

This behavior is based on how the slow detection algorithm is implemented. It's 
not terribly sophisticated so it doesn't use a rolling average or anything like 
that which accounts for historical acknowledgements beyond the current time 
slice. In other words, the algorithm _only_ looks at the current time slice.

For example, in the case of this test, every {{3}} seconds the broker will look 
at how many messages were acknowledged _since the last time acks were checked_ 
(i.e. 3 seconds ago). If the number of acks during that time slice is not above 
the configured threshold (i.e. 1 per minute or 0.0167 per second in this case) 
then the consumer will be disconnected. Since the client is only acknowledging 
messages ever 4 seconds then eventually there will be a 3 second window when 
there are no acknowledgements which will cause the broker to identify the 
consumer as slow and kill it.

I think the slow detection algorithm could certainly be improved, but I 
wouldn't categorize this as a bug. The algorithm is working as it was designed 
as far as I can tell.

> Retroactive consumers get continuously kicked as a slow consumer if consumers 
> dedupe messages
> ---------------------------------------------------------------------------------------------
>
>                 Key: ARTEMIS-4681
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4681
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>    Affects Versions: 2.32.0
>            Reporter: Josh Byster
>            Priority: Minor
>
> I noticed while running a consumer on a retroactive topic and slow consumer 
> kicking with ActiveMQ Classic 5.16.5 (which has duplicate message detection) 
> that the broker gets into a vicious kicking loop:
> # I connect to Retoractive.Addr, which as retroactive message count set to 
> 100.
> # I receive the last 100 messages and continue to listen for new messages. 
> Everything is fine at this point.
> # I disconnect or get killed due to being a slow consumer.
> # I automatically reconnect to Retoractive.Addr and, since it's retroactive, 
> I receive the last 100 messages immediately.
> # I discard all of them due to having already seen them (after all, I just 
> reconnected, I myself did not restart)
> # Artemis sees this as a consumer that has not acked any messages (since they 
> were discarded) and immediately kills it.
> #  The cycle continues…
> It seems retroactive consumers are doing what it should, message 
> de-duplication client-side also working as expected, but not sure if slow 
> consumer kicking should be disabled for retroactive topics due to this. I 
> have not verified this against an Artemis client, just against a Classic 
> client where I've observed this behavior.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to