gemmellr commented on code in PR #4656:
URL: https://github.com/apache/activemq-artemis/pull/4656#discussion_r1387958178


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java:
##########
@@ -937,6 +937,7 @@ public void 
testUnhandledRemoteReceiverCloseConditionCausesConnectionRebuild() t
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
 
             peer.expectDetach().optional(); // Broker is not consistent on 
sending the detach
+            peer.expectClose().optional();

Review Comment:
   Ditto



##########
tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ConnectionDroppedLeakTest.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.leak;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.github.checkleak.core.CheckLeak;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import 
org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.impl.ServerStatus;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.activemq.artemis.tests.leak.MemoryAssertions.assertMemory;
+import static 
org.apache.activemq.artemis.tests.leak.MemoryAssertions.basicMemoryAsserts;
+
+public class ConnectionDroppedLeakTest extends ActiveMQTestBase {
+
+   private ConnectionFactory createConnectionFactory(String protocol) {
+      if (protocol.equals("AMQP")) {
+         return CFUtil.createConnectionFactory("AMQP", 
"amqp://localhost:61616?amqp.idleTimeout=120000&failover.maxReconnectAttempts=1&jms.prefetchPolicy.all=10&jms.forceAsyncAcks=true");
+      } else {
+         return CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:61616");
+      }
+   }
+
+   private static final String QUEUE_NAME = "QUEUE_DROP";
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   ActiveMQServer server;
+
+   Queue serverQueue;
+
+   @BeforeClass
+   public static void beforeClass() throws Exception {
+      Assume.assumeTrue(CheckLeak.isLoaded());
+   }
+
+   @After
+   public void validateServer() throws Exception {
+      CheckLeak checkLeak = new CheckLeak();
+
+      // I am doing this check here because the test method might hold a 
client connection
+      // so this check has to be done after the test, and before the server is 
stopped
+      assertMemory(checkLeak, 0, RemotingConnectionImpl.class.getName());
+
+      server.stop();
+
+      server = null;
+      serverQueue = null;
+
+      clearServers();
+      ServerStatus.clear();
+
+      assertMemory(checkLeak, 0, ActiveMQServerImpl.class.getName());
+   }
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      server = createServer(true, createDefaultConfig(1, true));
+      server.getConfiguration().setJournalPoolFiles(4).setJournalMinFiles(2);
+      server.start();
+      server.addAddressInfo(new 
AddressInfo(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST));
+
+      serverQueue = server.createQueue(new 
QueueConfiguration().setAddress(QUEUE_NAME).setName(QUEUE_NAME).setRoutingType(RoutingType.ANYCAST).setDurable(true));
+   }
+
+   @Test
+   public void testDropConnectionsAMQP() throws Exception {
+      doDropConnections("AMQP");
+   }
+
+   @Test
+   public void testDropConnectionsCORE() throws Exception {
+      doDropConnections("CORE");
+   }
+
+   @Test
+   public void testDropConnectionsOPENWIRE() throws Exception {
+      doDropConnections("OPENWIRE");
+   }
+
+   private void doDropConnections(String protocol) throws Exception {
+      basicMemoryAsserts();
+
+      CountDownLatch latchDone = new CountDownLatch(2);
+      CountDownLatch latchReceived = new CountDownLatch(50);
+      AtomicInteger errors = new AtomicInteger(2);
+      AtomicBoolean running = new AtomicBoolean(true);
+
+      ExecutorService executorService = Executors.newFixedThreadPool(2);
+      runAfter(executorService::shutdownNow);
+      runAfter(() -> running.set(false));
+
+      executorService.execute(() -> {
+         ConnectionFactory cf = createConnectionFactory(protocol);
+         Connection connection = null;
+         try {
+            connection = cf.createConnection(); // I will leave this open on 
purpose
+            connection.start();
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            connection.start();
+            MessageConsumer consumer = 
session.createConsumer(session.createQueue(QUEUE_NAME));
+            while (running.get()) {
+               Message message = consumer.receive(100);
+               if (message != null) {
+                  latchReceived.countDown();
+                  session.commit();
+               }
+            }
+         } catch (Exception e) {
+            errors.incrementAndGet();
+         } finally {
+            if (protocol.equals("OPENWIRE")) {
+               try {
+                  connection.close();// only closing the openwire as it would 
leave a hanging thread
+               } catch (Throwable ignored) {
+               }
+            }
+            latchDone.countDown();
+         }
+      });
+
+      executorService.execute(() -> {
+         ConnectionFactory cf = createConnectionFactory(protocol);
+         Connection connection = null;
+         try {
+            connection = cf.createConnection(); // I will leave this open on 
purpose
+            connection.start();
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            connection.start();

Review Comment:
   ditto



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java:
##########
@@ -219,6 +219,7 @@ public void 
testFederationCreatesControlLinkAndClosesConnectionDetachIndicatesNo
                                        .withNullTarget();
          peer.remoteDetach().withErrorCondition("amqp:unauthorized-access", 
"Not authroized").queue();
          peer.expectDetach().optional();
+         peer.expectClose().optional();

Review Comment:
   Seems off for a test which is explicitly meant to be checking the connection 
is closed, to make receiving that close optional.



##########
tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ConnectionDroppedLeakTest.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.leak;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.github.checkleak.core.CheckLeak;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import 
org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.impl.ServerStatus;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.activemq.artemis.tests.leak.MemoryAssertions.assertMemory;
+import static 
org.apache.activemq.artemis.tests.leak.MemoryAssertions.basicMemoryAsserts;
+
+public class ConnectionDroppedLeakTest extends ActiveMQTestBase {
+
+   private ConnectionFactory createConnectionFactory(String protocol) {
+      if (protocol.equals("AMQP")) {
+         return CFUtil.createConnectionFactory("AMQP", 
"amqp://localhost:61616?amqp.idleTimeout=120000&failover.maxReconnectAttempts=1&jms.prefetchPolicy.all=10&jms.forceAsyncAcks=true");
+      } else {
+         return CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:61616");
+      }
+   }
+
+   private static final String QUEUE_NAME = "QUEUE_DROP";
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   ActiveMQServer server;
+
+   Queue serverQueue;
+
+   @BeforeClass
+   public static void beforeClass() throws Exception {
+      Assume.assumeTrue(CheckLeak.isLoaded());
+   }
+
+   @After
+   public void validateServer() throws Exception {
+      CheckLeak checkLeak = new CheckLeak();
+
+      // I am doing this check here because the test method might hold a 
client connection
+      // so this check has to be done after the test, and before the server is 
stopped
+      assertMemory(checkLeak, 0, RemotingConnectionImpl.class.getName());
+
+      server.stop();
+
+      server = null;
+      serverQueue = null;
+
+      clearServers();
+      ServerStatus.clear();
+
+      assertMemory(checkLeak, 0, ActiveMQServerImpl.class.getName());
+   }
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      server = createServer(true, createDefaultConfig(1, true));
+      server.getConfiguration().setJournalPoolFiles(4).setJournalMinFiles(2);
+      server.start();
+      server.addAddressInfo(new 
AddressInfo(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST));
+
+      serverQueue = server.createQueue(new 
QueueConfiguration().setAddress(QUEUE_NAME).setName(QUEUE_NAME).setRoutingType(RoutingType.ANYCAST).setDurable(true));
+   }
+
+   @Test
+   public void testDropConnectionsAMQP() throws Exception {
+      doDropConnections("AMQP");
+   }
+
+   @Test
+   public void testDropConnectionsCORE() throws Exception {
+      doDropConnections("CORE");
+   }
+
+   @Test
+   public void testDropConnectionsOPENWIRE() throws Exception {
+      doDropConnections("OPENWIRE");
+   }
+
+   private void doDropConnections(String protocol) throws Exception {
+      basicMemoryAsserts();
+
+      CountDownLatch latchDone = new CountDownLatch(2);
+      CountDownLatch latchReceived = new CountDownLatch(50);
+      AtomicInteger errors = new AtomicInteger(2);
+      AtomicBoolean running = new AtomicBoolean(true);
+
+      ExecutorService executorService = Executors.newFixedThreadPool(2);
+      runAfter(executorService::shutdownNow);
+      runAfter(() -> running.set(false));
+
+      executorService.execute(() -> {
+         ConnectionFactory cf = createConnectionFactory(protocol);
+         Connection connection = null;
+         try {
+            connection = cf.createConnection(); // I will leave this open on 
purpose
+            connection.start();
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            connection.start();

Review Comment:
   duplicate connection start



##########
tests/integration-tests-isolated/src/test/java/org/apache/activemq/artemis/tests/integration/isolated/client/ConnectionDroppedTest.java:
##########
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.isolated.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import 
org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.ThreadDumpUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.qpid.protonj2.test.driver.ProtonTestClient;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConnectionDroppedTest extends ActiveMQTestBase {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   public ConnectionDroppedTest() {
+      // this is the reason why I'm putting this test on the "isolated" 
package.
+      disableCheckThread();
+   }
+
+   @Test(timeout = 20_000)
+   public void testConsumerDroppedWithProtonTestClient() throws Exception {
+      int NUMBER_OF_CONNECTIONS = 100;
+      ActiveMQServer server = createServer(true, createDefaultConfig(true));
+      server.start();
+      Queue serverQueue = server.createQueue(new 
QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST).setAddress("test-queue").setAutoCreated(false));
+
+      ExecutorService executorService = 
Executors.newFixedThreadPool(NUMBER_OF_CONNECTIONS);
+      runAfter(executorService::shutdownNow);
+
+      CountDownLatch done = new CountDownLatch(NUMBER_OF_CONNECTIONS);
+      AtomicInteger errors = new AtomicInteger(0);
+
+      for (int i = 0; i < NUMBER_OF_CONNECTIONS; i++) {
+         executorService.execute(() -> {
+            try (ProtonTestClient peer = new ProtonTestClient()) {
+               peer.queueClientSaslAnonymousConnect();
+               peer.remoteOpen().queue();
+               peer.expectOpen();
+               peer.remoteBegin().queue();
+               peer.expectBegin();
+               
peer.remoteAttach().ofReceiver().withName(RandomUtil.randomString()).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withTarget().also().withSource().withAddress("test-queue").withExpiryPolicyOnLinkDetach().withDurabilityOfNone().withCapabilities("queue").withOutcomes("amqp:accepted:list",
 "amqp:rejected:list").also().queue();
+               peer.dropAfterLastHandler(1000); // This closes the netty 
connection after the attach is written
+               peer.connect("localhost", 61616);
+
+               // Waits for all the commands to fire and the drop action to be 
run.
+               peer.waitForScriptToComplete();
+            } catch (Throwable e) {
+               errors.incrementAndGet();
+               logger.warn(e.getMessage(), e);
+            } finally {
+               done.countDown();
+            }
+         });
+      }
+
+      Assert.assertTrue(done.await(10, TimeUnit.SECONDS));
+
+      Assert.assertEquals(0, errors.get());
+
+      Wait.assertEquals(0, () -> serverQueue.getConsumers().size(), 5000, 100);
+   }
+
+   @Test(timeout = 20_000)
+   public void testRegularClose() throws Exception {
+      int NUMBER_OF_CONNECTIONS = 100;
+      int REPEATS = 10;
+      ActiveMQServer server = createServer(true, createDefaultConfig(true));
+      server.start();
+      Queue serverQueue = server.createQueue(new 
QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST).setAddress("test-queue").setAutoCreated(false));
+
+      ExecutorService executorService = 
Executors.newFixedThreadPool(NUMBER_OF_CONNECTIONS);
+      runAfter(executorService::shutdownNow);
+
+      CountDownLatch done = new CountDownLatch(NUMBER_OF_CONNECTIONS);
+      AtomicInteger errors = new AtomicInteger(0);
+      AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler();
+      runAfter(loggerHandler::stop);
+
+      for (int i = 0; i < NUMBER_OF_CONNECTIONS; i++) {
+         executorService.execute(() -> {
+            for (int r = 0; r < REPEATS; r++) {
+               try (ProtonTestClient peer = new ProtonTestClient()) {
+                  peer.queueClientSaslAnonymousConnect();
+                  peer.remoteOpen().queue();
+                  peer.expectOpen();
+                  peer.remoteBegin().queue();
+                  peer.expectBegin();
+                  
peer.remoteAttach().ofReceiver().withName(RandomUtil.randomString()).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withTarget().also().withSource().withAddress("test-queue").withExpiryPolicyOnLinkDetach().withDurabilityOfNone().withCapabilities("queue").withOutcomes("amqp:accepted:list",
 "amqp:rejected:list").also().queue();
+                  peer.expectAttach();
+                  peer.remoteClose().queue();
+                  peer.expectClose();
+
+                  peer.connect("localhost", 61616);
+
+                  peer.waitForScriptToComplete();
+               } catch (Throwable e) {
+                  errors.incrementAndGet();
+                  logger.warn(e.getMessage(), e);
+                  break;
+               }
+            }
+            done.countDown();
+         });
+      }
+
+      Assert.assertTrue(done.await(10, TimeUnit.SECONDS));
+
+      Assert.assertEquals(0, errors.get());
+
+      Assert.assertFalse(loggerHandler.findText("AMQ212037"));
+
+      // TODO: Fix these as part of ARTEMIS-4483
+      /*Assert.assertFalse(loggerHandler.findText("Connection failure"));
+      Assert.assertFalse(loggerHandler.findText("REMOTE_DISCONNECT"));
+      Assert.assertFalse(loggerHandler.findText("AMQ222061"));
+      Assert.assertFalse(loggerHandler.findText("AMQ222107")); */
+
+      Wait.assertEquals(0, () -> serverQueue.getConsumers().size(), 5000, 100);
+      Wait.assertEquals(0, server::getConnectionCount, 5000);
+
+   }
+
+   @Test
+   public void testConsumerDroppedAMQP() throws Throwable {
+      testConsumerDroppedWithRegularClient("AMQP");
+
+   }
+
+   @Test
+   public void testConsumerDroppedCORE() throws Throwable {
+      testConsumerDroppedWithRegularClient("CORE");
+   }
+
+   @Test
+   public void testConsumerDroppedOpenWire() throws Throwable {
+      testConsumerDroppedWithRegularClient("OPENWIRE");
+   }
+
+   public void testConsumerDroppedWithRegularClient(final String protocol) 
throws Throwable {
+      int NUMBER_OF_CONNECTIONS = 25;
+      int REPEATS = 10;
+      ActiveMQServer server = createServer(true, createDefaultConfig(true));
+      server.start();
+      Queue serverQueue = server.createQueue(new 
QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST).setAddress("test-queue").setAutoCreated(false));
+
+      ExecutorService executorService = 
Executors.newFixedThreadPool(NUMBER_OF_CONNECTIONS);
+      runAfter(executorService::shutdownNow);
+
+      CountDownLatch done = new CountDownLatch(NUMBER_OF_CONNECTIONS);
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:61616");
+
+      final AtomicBoolean running = new AtomicBoolean(true);
+
+      runAfter(() -> running.set(false));
+
+      CyclicBarrier flagStart = new CyclicBarrier(NUMBER_OF_CONNECTIONS + 1);
+      flagStart.reset();
+
+      for (int i = 0; i < NUMBER_OF_CONNECTIONS; i++) {
+         final int t = i;
+         executorService.execute(() -> {
+            try {
+               boolean alreadyStarted = false;
+               AtomicBoolean ex = new AtomicBoolean(true);
+               while (running.get()) {
+                  try {
+                     // do not be tempted to use try (connection = 
factory.createConnection())
+                     // this is because we don't need to close the connection 
after a network failure on this test.
+                     Connection connection = factory.createConnection();
+
+                     synchronized (ConnectionDroppedTest.this) {
+                        runAfter(connection::close);
+                     }
+                     connection.setExceptionListener(new ExceptionListener() {
+                        @Override
+                        public void onException(JMSException exception) {
+                           ex.set(true);
+                        }
+                     });
+                     flagStart.await(60, TimeUnit.SECONDS);
+
+                     connection.start();
+
+                     Session session = connection.createSession(true, 
Session.AUTO_ACKNOWLEDGE);
+                     javax.jms.Queue jmsQueue = 
session.createQueue("test-queue");
+
+                     while (running.get() && !ex.get()) {
+                        if (!alreadyStarted) {
+                           alreadyStarted = true;
+                        }
+                        System.out.println("Consumer");
+                        MessageConsumer consumer = 
session.createConsumer(jmsQueue);
+                        Thread.sleep(500);
+                     }
+
+                     if (!protocol.equals("CORE")) {
+                        connection.close();
+                     }
+                  } catch (Exception e) {
+                     logger.debug(e.getMessage(), e);
+                  }
+               }
+            } finally {
+               done.countDown();
+            }
+         });
+      }
+
+      for (int i = 0; i < REPEATS; i++) {
+         try {
+            flagStart.await(60, TimeUnit.SECONDS); // align all the clients at 
the same spot
+         } catch (Throwable throwable) {
+            logger.info(ThreadDumpUtil.threadDump("timed out flagstart"));
+            throw throwable;
+         }
+
+         
logger.info("*******************************************************************************************************************************\nloop
 kill {}" + 
"\n*******************************************************************************************************************************",
 i);
+         server.getRemotingService().getConnections().forEach(r -> {
+            r.fail(new ActiveMQException("it's a simulation"));
+         });
+
+      }
+
+      running.set(false);
+      try {
+         flagStart.await(1, TimeUnit.SECONDS);
+      } catch (Exception ignored) {
+      }
+      if (!done.await(10, TimeUnit.SECONDS)) {
+         for (int i = 0; i < 10; i++) {
+            System.out.println("Will fail");
+            Thread.sleep(1000);
+         }
+         logger.warn(ThreadDumpUtil.threadDump("Still running"));
+         Assert.fail("Threads are still running");
+      }

Review Comment:
   This doesnt really make sense to me. It first waits up to 10 seconds for 
completion, then if that doesnt succeed, it will just _always_ burn 10 seconds 
sitting in a loop saying 'Will fail' every second (why is there a mix of 
logging and system.out?), and then after that further log a threaddump "still 
running" and finally fail?
   
   Why burn the additional 10 seconds?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to