[ 
https://issues.apache.org/jira/browse/ARTEMIS-4476?focusedWorklogId=889729&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-889729
 ]

ASF GitHub Bot logged work on ARTEMIS-4476:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Nov/23 13:05
            Start Date: 09/Nov/23 13:05
    Worklog Time Spent: 10m 
      Work Description: gemmellr commented on code in PR #4656:
URL: https://github.com/apache/activemq-artemis/pull/4656#discussion_r1387958178


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java:
##########
@@ -937,6 +937,7 @@ public void 
testUnhandledRemoteReceiverCloseConditionCausesConnectionRebuild() t
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
 
             peer.expectDetach().optional(); // Broker is not consistent on 
sending the detach
+            peer.expectClose().optional();

Review Comment:
   Ditto



##########
tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ConnectionDroppedLeakTest.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.leak;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.github.checkleak.core.CheckLeak;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import 
org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.impl.ServerStatus;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.activemq.artemis.tests.leak.MemoryAssertions.assertMemory;
+import static 
org.apache.activemq.artemis.tests.leak.MemoryAssertions.basicMemoryAsserts;
+
+public class ConnectionDroppedLeakTest extends ActiveMQTestBase {
+
+   private ConnectionFactory createConnectionFactory(String protocol) {
+      if (protocol.equals("AMQP")) {
+         return CFUtil.createConnectionFactory("AMQP", 
"amqp://localhost:61616?amqp.idleTimeout=120000&failover.maxReconnectAttempts=1&jms.prefetchPolicy.all=10&jms.forceAsyncAcks=true");
+      } else {
+         return CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:61616");
+      }
+   }
+
+   private static final String QUEUE_NAME = "QUEUE_DROP";
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   ActiveMQServer server;
+
+   Queue serverQueue;
+
+   @BeforeClass
+   public static void beforeClass() throws Exception {
+      Assume.assumeTrue(CheckLeak.isLoaded());
+   }
+
+   @After
+   public void validateServer() throws Exception {
+      CheckLeak checkLeak = new CheckLeak();
+
+      // I am doing this check here because the test method might hold a 
client connection
+      // so this check has to be done after the test, and before the server is 
stopped
+      assertMemory(checkLeak, 0, RemotingConnectionImpl.class.getName());
+
+      server.stop();
+
+      server = null;
+      serverQueue = null;
+
+      clearServers();
+      ServerStatus.clear();
+
+      assertMemory(checkLeak, 0, ActiveMQServerImpl.class.getName());
+   }
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      server = createServer(true, createDefaultConfig(1, true));
+      server.getConfiguration().setJournalPoolFiles(4).setJournalMinFiles(2);
+      server.start();
+      server.addAddressInfo(new 
AddressInfo(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST));
+
+      serverQueue = server.createQueue(new 
QueueConfiguration().setAddress(QUEUE_NAME).setName(QUEUE_NAME).setRoutingType(RoutingType.ANYCAST).setDurable(true));
+   }
+
+   @Test
+   public void testDropConnectionsAMQP() throws Exception {
+      doDropConnections("AMQP");
+   }
+
+   @Test
+   public void testDropConnectionsCORE() throws Exception {
+      doDropConnections("CORE");
+   }
+
+   @Test
+   public void testDropConnectionsOPENWIRE() throws Exception {
+      doDropConnections("OPENWIRE");
+   }
+
+   private void doDropConnections(String protocol) throws Exception {
+      basicMemoryAsserts();
+
+      CountDownLatch latchDone = new CountDownLatch(2);
+      CountDownLatch latchReceived = new CountDownLatch(50);
+      AtomicInteger errors = new AtomicInteger(2);
+      AtomicBoolean running = new AtomicBoolean(true);
+
+      ExecutorService executorService = Executors.newFixedThreadPool(2);
+      runAfter(executorService::shutdownNow);
+      runAfter(() -> running.set(false));
+
+      executorService.execute(() -> {
+         ConnectionFactory cf = createConnectionFactory(protocol);
+         Connection connection = null;
+         try {
+            connection = cf.createConnection(); // I will leave this open on 
purpose
+            connection.start();
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            connection.start();
+            MessageConsumer consumer = 
session.createConsumer(session.createQueue(QUEUE_NAME));
+            while (running.get()) {
+               Message message = consumer.receive(100);
+               if (message != null) {
+                  latchReceived.countDown();
+                  session.commit();
+               }
+            }
+         } catch (Exception e) {
+            errors.incrementAndGet();
+         } finally {
+            if (protocol.equals("OPENWIRE")) {
+               try {
+                  connection.close();// only closing the openwire as it would 
leave a hanging thread
+               } catch (Throwable ignored) {
+               }
+            }
+            latchDone.countDown();
+         }
+      });
+
+      executorService.execute(() -> {
+         ConnectionFactory cf = createConnectionFactory(protocol);
+         Connection connection = null;
+         try {
+            connection = cf.createConnection(); // I will leave this open on 
purpose
+            connection.start();
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            connection.start();

Review Comment:
   ditto



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java:
##########
@@ -219,6 +219,7 @@ public void 
testFederationCreatesControlLinkAndClosesConnectionDetachIndicatesNo
                                        .withNullTarget();
          peer.remoteDetach().withErrorCondition("amqp:unauthorized-access", 
"Not authroized").queue();
          peer.expectDetach().optional();
+         peer.expectClose().optional();

Review Comment:
   Seems off for a test which is explicitly meant to be checking the connection 
is closed, to make receiving that close optional.



##########
tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ConnectionDroppedLeakTest.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.leak;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.github.checkleak.core.CheckLeak;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import 
org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.impl.ServerStatus;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.activemq.artemis.tests.leak.MemoryAssertions.assertMemory;
+import static 
org.apache.activemq.artemis.tests.leak.MemoryAssertions.basicMemoryAsserts;
+
+public class ConnectionDroppedLeakTest extends ActiveMQTestBase {
+
+   private ConnectionFactory createConnectionFactory(String protocol) {
+      if (protocol.equals("AMQP")) {
+         return CFUtil.createConnectionFactory("AMQP", 
"amqp://localhost:61616?amqp.idleTimeout=120000&failover.maxReconnectAttempts=1&jms.prefetchPolicy.all=10&jms.forceAsyncAcks=true");
+      } else {
+         return CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:61616");
+      }
+   }
+
+   private static final String QUEUE_NAME = "QUEUE_DROP";
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   ActiveMQServer server;
+
+   Queue serverQueue;
+
+   @BeforeClass
+   public static void beforeClass() throws Exception {
+      Assume.assumeTrue(CheckLeak.isLoaded());
+   }
+
+   @After
+   public void validateServer() throws Exception {
+      CheckLeak checkLeak = new CheckLeak();
+
+      // I am doing this check here because the test method might hold a 
client connection
+      // so this check has to be done after the test, and before the server is 
stopped
+      assertMemory(checkLeak, 0, RemotingConnectionImpl.class.getName());
+
+      server.stop();
+
+      server = null;
+      serverQueue = null;
+
+      clearServers();
+      ServerStatus.clear();
+
+      assertMemory(checkLeak, 0, ActiveMQServerImpl.class.getName());
+   }
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      server = createServer(true, createDefaultConfig(1, true));
+      server.getConfiguration().setJournalPoolFiles(4).setJournalMinFiles(2);
+      server.start();
+      server.addAddressInfo(new 
AddressInfo(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST));
+
+      serverQueue = server.createQueue(new 
QueueConfiguration().setAddress(QUEUE_NAME).setName(QUEUE_NAME).setRoutingType(RoutingType.ANYCAST).setDurable(true));
+   }
+
+   @Test
+   public void testDropConnectionsAMQP() throws Exception {
+      doDropConnections("AMQP");
+   }
+
+   @Test
+   public void testDropConnectionsCORE() throws Exception {
+      doDropConnections("CORE");
+   }
+
+   @Test
+   public void testDropConnectionsOPENWIRE() throws Exception {
+      doDropConnections("OPENWIRE");
+   }
+
+   private void doDropConnections(String protocol) throws Exception {
+      basicMemoryAsserts();
+
+      CountDownLatch latchDone = new CountDownLatch(2);
+      CountDownLatch latchReceived = new CountDownLatch(50);
+      AtomicInteger errors = new AtomicInteger(2);
+      AtomicBoolean running = new AtomicBoolean(true);
+
+      ExecutorService executorService = Executors.newFixedThreadPool(2);
+      runAfter(executorService::shutdownNow);
+      runAfter(() -> running.set(false));
+
+      executorService.execute(() -> {
+         ConnectionFactory cf = createConnectionFactory(protocol);
+         Connection connection = null;
+         try {
+            connection = cf.createConnection(); // I will leave this open on 
purpose
+            connection.start();
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            connection.start();

Review Comment:
   duplicate connection start



##########
tests/integration-tests-isolated/src/test/java/org/apache/activemq/artemis/tests/integration/isolated/client/ConnectionDroppedTest.java:
##########
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.isolated.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import 
org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.ThreadDumpUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.qpid.protonj2.test.driver.ProtonTestClient;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConnectionDroppedTest extends ActiveMQTestBase {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   public ConnectionDroppedTest() {
+      // this is the reason why I'm putting this test on the "isolated" 
package.
+      disableCheckThread();
+   }
+
+   @Test(timeout = 20_000)
+   public void testConsumerDroppedWithProtonTestClient() throws Exception {
+      int NUMBER_OF_CONNECTIONS = 100;
+      ActiveMQServer server = createServer(true, createDefaultConfig(true));
+      server.start();
+      Queue serverQueue = server.createQueue(new 
QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST).setAddress("test-queue").setAutoCreated(false));
+
+      ExecutorService executorService = 
Executors.newFixedThreadPool(NUMBER_OF_CONNECTIONS);
+      runAfter(executorService::shutdownNow);
+
+      CountDownLatch done = new CountDownLatch(NUMBER_OF_CONNECTIONS);
+      AtomicInteger errors = new AtomicInteger(0);
+
+      for (int i = 0; i < NUMBER_OF_CONNECTIONS; i++) {
+         executorService.execute(() -> {
+            try (ProtonTestClient peer = new ProtonTestClient()) {
+               peer.queueClientSaslAnonymousConnect();
+               peer.remoteOpen().queue();
+               peer.expectOpen();
+               peer.remoteBegin().queue();
+               peer.expectBegin();
+               
peer.remoteAttach().ofReceiver().withName(RandomUtil.randomString()).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withTarget().also().withSource().withAddress("test-queue").withExpiryPolicyOnLinkDetach().withDurabilityOfNone().withCapabilities("queue").withOutcomes("amqp:accepted:list",
 "amqp:rejected:list").also().queue();
+               peer.dropAfterLastHandler(1000); // This closes the netty 
connection after the attach is written
+               peer.connect("localhost", 61616);
+
+               // Waits for all the commands to fire and the drop action to be 
run.
+               peer.waitForScriptToComplete();
+            } catch (Throwable e) {
+               errors.incrementAndGet();
+               logger.warn(e.getMessage(), e);
+            } finally {
+               done.countDown();
+            }
+         });
+      }
+
+      Assert.assertTrue(done.await(10, TimeUnit.SECONDS));
+
+      Assert.assertEquals(0, errors.get());
+
+      Wait.assertEquals(0, () -> serverQueue.getConsumers().size(), 5000, 100);
+   }
+
+   @Test(timeout = 20_000)
+   public void testRegularClose() throws Exception {
+      int NUMBER_OF_CONNECTIONS = 100;
+      int REPEATS = 10;
+      ActiveMQServer server = createServer(true, createDefaultConfig(true));
+      server.start();
+      Queue serverQueue = server.createQueue(new 
QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST).setAddress("test-queue").setAutoCreated(false));
+
+      ExecutorService executorService = 
Executors.newFixedThreadPool(NUMBER_OF_CONNECTIONS);
+      runAfter(executorService::shutdownNow);
+
+      CountDownLatch done = new CountDownLatch(NUMBER_OF_CONNECTIONS);
+      AtomicInteger errors = new AtomicInteger(0);
+      AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler();
+      runAfter(loggerHandler::stop);
+
+      for (int i = 0; i < NUMBER_OF_CONNECTIONS; i++) {
+         executorService.execute(() -> {
+            for (int r = 0; r < REPEATS; r++) {
+               try (ProtonTestClient peer = new ProtonTestClient()) {
+                  peer.queueClientSaslAnonymousConnect();
+                  peer.remoteOpen().queue();
+                  peer.expectOpen();
+                  peer.remoteBegin().queue();
+                  peer.expectBegin();
+                  
peer.remoteAttach().ofReceiver().withName(RandomUtil.randomString()).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withTarget().also().withSource().withAddress("test-queue").withExpiryPolicyOnLinkDetach().withDurabilityOfNone().withCapabilities("queue").withOutcomes("amqp:accepted:list",
 "amqp:rejected:list").also().queue();
+                  peer.expectAttach();
+                  peer.remoteClose().queue();
+                  peer.expectClose();
+
+                  peer.connect("localhost", 61616);
+
+                  peer.waitForScriptToComplete();
+               } catch (Throwable e) {
+                  errors.incrementAndGet();
+                  logger.warn(e.getMessage(), e);
+                  break;
+               }
+            }
+            done.countDown();
+         });
+      }
+
+      Assert.assertTrue(done.await(10, TimeUnit.SECONDS));
+
+      Assert.assertEquals(0, errors.get());
+
+      Assert.assertFalse(loggerHandler.findText("AMQ212037"));
+
+      // TODO: Fix these as part of ARTEMIS-4483
+      /*Assert.assertFalse(loggerHandler.findText("Connection failure"));
+      Assert.assertFalse(loggerHandler.findText("REMOTE_DISCONNECT"));
+      Assert.assertFalse(loggerHandler.findText("AMQ222061"));
+      Assert.assertFalse(loggerHandler.findText("AMQ222107")); */
+
+      Wait.assertEquals(0, () -> serverQueue.getConsumers().size(), 5000, 100);
+      Wait.assertEquals(0, server::getConnectionCount, 5000);
+
+   }
+
+   @Test
+   public void testConsumerDroppedAMQP() throws Throwable {
+      testConsumerDroppedWithRegularClient("AMQP");
+
+   }
+
+   @Test
+   public void testConsumerDroppedCORE() throws Throwable {
+      testConsumerDroppedWithRegularClient("CORE");
+   }
+
+   @Test
+   public void testConsumerDroppedOpenWire() throws Throwable {
+      testConsumerDroppedWithRegularClient("OPENWIRE");
+   }
+
+   public void testConsumerDroppedWithRegularClient(final String protocol) 
throws Throwable {
+      int NUMBER_OF_CONNECTIONS = 25;
+      int REPEATS = 10;
+      ActiveMQServer server = createServer(true, createDefaultConfig(true));
+      server.start();
+      Queue serverQueue = server.createQueue(new 
QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST).setAddress("test-queue").setAutoCreated(false));
+
+      ExecutorService executorService = 
Executors.newFixedThreadPool(NUMBER_OF_CONNECTIONS);
+      runAfter(executorService::shutdownNow);
+
+      CountDownLatch done = new CountDownLatch(NUMBER_OF_CONNECTIONS);
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:61616");
+
+      final AtomicBoolean running = new AtomicBoolean(true);
+
+      runAfter(() -> running.set(false));
+
+      CyclicBarrier flagStart = new CyclicBarrier(NUMBER_OF_CONNECTIONS + 1);
+      flagStart.reset();
+
+      for (int i = 0; i < NUMBER_OF_CONNECTIONS; i++) {
+         final int t = i;
+         executorService.execute(() -> {
+            try {
+               boolean alreadyStarted = false;
+               AtomicBoolean ex = new AtomicBoolean(true);
+               while (running.get()) {
+                  try {
+                     // do not be tempted to use try (connection = 
factory.createConnection())
+                     // this is because we don't need to close the connection 
after a network failure on this test.
+                     Connection connection = factory.createConnection();
+
+                     synchronized (ConnectionDroppedTest.this) {
+                        runAfter(connection::close);
+                     }
+                     connection.setExceptionListener(new ExceptionListener() {
+                        @Override
+                        public void onException(JMSException exception) {
+                           ex.set(true);
+                        }
+                     });
+                     flagStart.await(60, TimeUnit.SECONDS);
+
+                     connection.start();
+
+                     Session session = connection.createSession(true, 
Session.AUTO_ACKNOWLEDGE);
+                     javax.jms.Queue jmsQueue = 
session.createQueue("test-queue");
+
+                     while (running.get() && !ex.get()) {
+                        if (!alreadyStarted) {
+                           alreadyStarted = true;
+                        }
+                        System.out.println("Consumer");
+                        MessageConsumer consumer = 
session.createConsumer(jmsQueue);
+                        Thread.sleep(500);
+                     }
+
+                     if (!protocol.equals("CORE")) {
+                        connection.close();
+                     }
+                  } catch (Exception e) {
+                     logger.debug(e.getMessage(), e);
+                  }
+               }
+            } finally {
+               done.countDown();
+            }
+         });
+      }
+
+      for (int i = 0; i < REPEATS; i++) {
+         try {
+            flagStart.await(60, TimeUnit.SECONDS); // align all the clients at 
the same spot
+         } catch (Throwable throwable) {
+            logger.info(ThreadDumpUtil.threadDump("timed out flagstart"));
+            throw throwable;
+         }
+
+         
logger.info("*******************************************************************************************************************************\nloop
 kill {}" + 
"\n*******************************************************************************************************************************",
 i);
+         server.getRemotingService().getConnections().forEach(r -> {
+            r.fail(new ActiveMQException("it's a simulation"));
+         });
+
+      }
+
+      running.set(false);
+      try {
+         flagStart.await(1, TimeUnit.SECONDS);
+      } catch (Exception ignored) {
+      }
+      if (!done.await(10, TimeUnit.SECONDS)) {
+         for (int i = 0; i < 10; i++) {
+            System.out.println("Will fail");
+            Thread.sleep(1000);
+         }
+         logger.warn(ThreadDumpUtil.threadDump("Still running"));
+         Assert.fail("Threads are still running");
+      }

Review Comment:
   This doesnt really make sense to me. It first waits up to 10 seconds for 
completion, then if that doesnt succeed, it will just _always_ burn 10 seconds 
sitting in a loop saying 'Will fail' every second (why is there a mix of 
logging and system.out?), and then after that further log a threaddump "still 
running" and finally fail?
   
   Why burn the additional 10 seconds?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 889729)
    Time Spent: 1h 40m  (was: 1.5h)

> Connection Failure Race Conditions in AMQP and Core
> ---------------------------------------------------
>
>                 Key: ARTEMIS-4476
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4476
>             Project: ActiveMQ Artemis
>          Issue Type: Task
>            Reporter: Clebert Suconic
>            Assignee: Clebert Suconic
>            Priority: Major
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Failure Detection has a possibility to a race condition with the processing 
> of the client packets (or frames in the case of AMQP).
> This is because Netty detects the failure and removes the connection objects 
> while the packets are still processing things. 
> I was not able to reproduce this particular issue, but I have seen a case 
> from a memory dump where the consumer was created while the connection was 
> already dropped, leaving the consumer isolated without any communication with 
> clients.
> That particular case I could see a possibility because of these races.
> I am adding tests to exercise connection failure in stress and I was able to 
> reproduce other issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to