[ 
https://issues.apache.org/jira/browse/ARTEMIS-4476?focusedWorklogId=889754&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-889754
 ]

ASF GitHub Bot logged work on ARTEMIS-4476:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Nov/23 15:15
            Start Date: 09/Nov/23 15:15
    Worklog Time Spent: 10m 
      Work Description: gemmellr commented on code in PR #4656:
URL: https://github.com/apache/activemq-artemis/pull/4656#discussion_r1388154723


##########
tests/integration-tests-isolated/src/test/java/org/apache/activemq/artemis/tests/integration/isolated/client/ConnectionDroppedTest.java:
##########
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.isolated.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import 
org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.ThreadDumpUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.qpid.protonj2.test.driver.ProtonTestClient;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConnectionDroppedTest extends ActiveMQTestBase {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   public ConnectionDroppedTest() {
+      // this is the reason why I'm putting this test on the "isolated" 
package.
+      disableCheckThread();
+   }
+
+   @Test(timeout = 20_000)
+   public void testConsumerDroppedWithProtonTestClient() throws Exception {
+      int NUMBER_OF_CONNECTIONS = 100;
+      ActiveMQServer server = createServer(true, createDefaultConfig(true));
+      server.start();
+      Queue serverQueue = server.createQueue(new 
QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST).setAddress("test-queue").setAutoCreated(false));
+
+      ExecutorService executorService = 
Executors.newFixedThreadPool(NUMBER_OF_CONNECTIONS);
+      runAfter(executorService::shutdownNow);
+
+      CountDownLatch done = new CountDownLatch(NUMBER_OF_CONNECTIONS);
+      AtomicInteger errors = new AtomicInteger(0);
+
+      for (int i = 0; i < NUMBER_OF_CONNECTIONS; i++) {
+         executorService.execute(() -> {
+            try (ProtonTestClient peer = new ProtonTestClient()) {
+               peer.queueClientSaslAnonymousConnect();
+               peer.remoteOpen().queue();
+               peer.expectOpen();
+               peer.remoteBegin().queue();
+               peer.expectBegin();
+               
peer.remoteAttach().ofReceiver().withName(RandomUtil.randomString()).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withTarget().also().withSource().withAddress("test-queue").withExpiryPolicyOnLinkDetach().withDurabilityOfNone().withCapabilities("queue").withOutcomes("amqp:accepted:list",
 "amqp:rejected:list").also().queue();
+               peer.dropAfterLastHandler(1000); // This closes the netty 
connection after the attach is written
+               peer.connect("localhost", 61616);
+
+               // Waits for all the commands to fire and the drop action to be 
run.
+               peer.waitForScriptToComplete();
+            } catch (Throwable e) {
+               errors.incrementAndGet();
+               logger.warn(e.getMessage(), e);
+            } finally {
+               done.countDown();
+            }
+         });
+      }
+
+      Assert.assertTrue(done.await(10, TimeUnit.SECONDS));
+
+      Assert.assertEquals(0, errors.get());
+
+      Wait.assertEquals(0, () -> serverQueue.getConsumers().size(), 5000, 100);
+   }
+
+   @Test(timeout = 20_000)
+   public void testRegularClose() throws Exception {
+      int NUMBER_OF_CONNECTIONS = 100;
+      int REPEATS = 10;
+      ActiveMQServer server = createServer(true, createDefaultConfig(true));
+      server.start();
+      Queue serverQueue = server.createQueue(new 
QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST).setAddress("test-queue").setAutoCreated(false));
+
+      ExecutorService executorService = 
Executors.newFixedThreadPool(NUMBER_OF_CONNECTIONS);
+      runAfter(executorService::shutdownNow);
+
+      CountDownLatch done = new CountDownLatch(NUMBER_OF_CONNECTIONS);
+      AtomicInteger errors = new AtomicInteger(0);
+      AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler();
+      runAfter(loggerHandler::stop);
+
+      for (int i = 0; i < NUMBER_OF_CONNECTIONS; i++) {
+         executorService.execute(() -> {
+            for (int r = 0; r < REPEATS; r++) {
+               try (ProtonTestClient peer = new ProtonTestClient()) {
+                  peer.queueClientSaslAnonymousConnect();
+                  peer.remoteOpen().queue();
+                  peer.expectOpen();
+                  peer.remoteBegin().queue();
+                  peer.expectBegin();
+                  
peer.remoteAttach().ofReceiver().withName(RandomUtil.randomString()).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withTarget().also().withSource().withAddress("test-queue").withExpiryPolicyOnLinkDetach().withDurabilityOfNone().withCapabilities("queue").withOutcomes("amqp:accepted:list",
 "amqp:rejected:list").also().queue();
+                  peer.expectAttach();
+                  peer.remoteClose().queue();
+                  peer.expectClose();
+
+                  peer.connect("localhost", 61616);
+
+                  peer.waitForScriptToComplete();
+               } catch (Throwable e) {
+                  errors.incrementAndGet();
+                  logger.warn(e.getMessage(), e);
+                  break;
+               }
+            }
+            done.countDown();
+         });
+      }
+
+      Assert.assertTrue(done.await(10, TimeUnit.SECONDS));
+
+      Assert.assertEquals(0, errors.get());
+
+      Assert.assertFalse(loggerHandler.findText("AMQ212037"));
+
+      // TODO: Fix these as part of ARTEMIS-4483
+      /*Assert.assertFalse(loggerHandler.findText("Connection failure"));
+      Assert.assertFalse(loggerHandler.findText("REMOTE_DISCONNECT"));
+      Assert.assertFalse(loggerHandler.findText("AMQ222061"));
+      Assert.assertFalse(loggerHandler.findText("AMQ222107")); */
+
+      Wait.assertEquals(0, () -> serverQueue.getConsumers().size(), 5000, 100);
+      Wait.assertEquals(0, server::getConnectionCount, 5000);
+
+   }
+
+   @Test
+   public void testConsumerDroppedAMQP() throws Throwable {
+      testConsumerDroppedWithRegularClient("AMQP");
+
+   }
+
+   @Test
+   public void testConsumerDroppedCORE() throws Throwable {
+      testConsumerDroppedWithRegularClient("CORE");
+   }
+
+   @Test
+   public void testConsumerDroppedOpenWire() throws Throwable {
+      testConsumerDroppedWithRegularClient("OPENWIRE");
+   }
+
+   public void testConsumerDroppedWithRegularClient(final String protocol) 
throws Throwable {
+      int NUMBER_OF_CONNECTIONS = 25;
+      int REPEATS = 10;
+      ActiveMQServer server = createServer(true, createDefaultConfig(true));
+      server.start();
+      Queue serverQueue = server.createQueue(new 
QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST).setAddress("test-queue").setAutoCreated(false));
+
+      ExecutorService executorService = 
Executors.newFixedThreadPool(NUMBER_OF_CONNECTIONS);
+      runAfter(executorService::shutdownNow);
+
+      CountDownLatch done = new CountDownLatch(NUMBER_OF_CONNECTIONS);
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:61616");
+
+      final AtomicBoolean running = new AtomicBoolean(true);
+
+      runAfter(() -> running.set(false));
+
+      CyclicBarrier flagStart = new CyclicBarrier(NUMBER_OF_CONNECTIONS + 1);
+      flagStart.reset();
+
+      for (int i = 0; i < NUMBER_OF_CONNECTIONS; i++) {
+         final int t = i;
+         executorService.execute(() -> {
+            try {
+               boolean alreadyStarted = false;
+               AtomicBoolean ex = new AtomicBoolean(true);
+               while (running.get()) {
+                  try {
+                     // do not be tempted to use try (connection = 
factory.createConnection())
+                     // this is because we don't need to close the connection 
after a network failure on this test.
+                     Connection connection = factory.createConnection();
+
+                     synchronized (ConnectionDroppedTest.this) {
+                        runAfter(connection::close);
+                     }
+                     connection.setExceptionListener(new ExceptionListener() {
+                        @Override
+                        public void onException(JMSException exception) {
+                           ex.set(true);
+                        }
+                     });
+                     flagStart.await(60, TimeUnit.SECONDS);
+
+                     connection.start();
+
+                     Session session = connection.createSession(true, 
Session.AUTO_ACKNOWLEDGE);
+                     javax.jms.Queue jmsQueue = 
session.createQueue("test-queue");
+
+                     while (running.get() && !ex.get()) {
+                        if (!alreadyStarted) {
+                           alreadyStarted = true;
+                        }
+                        System.out.println("Consumer");
+                        MessageConsumer consumer = 
session.createConsumer(jmsQueue);
+                        Thread.sleep(500);
+                     }
+
+                     if (!protocol.equals("CORE")) {
+                        connection.close();
+                     }
+                  } catch (Exception e) {
+                     logger.debug(e.getMessage(), e);
+                  }
+               }
+            } finally {
+               done.countDown();
+            }
+         });
+      }
+
+      for (int i = 0; i < REPEATS; i++) {
+         try {
+            flagStart.await(60, TimeUnit.SECONDS); // align all the clients at 
the same spot
+         } catch (Throwable throwable) {
+            logger.info(ThreadDumpUtil.threadDump("timed out flagstart"));
+            throw throwable;
+         }
+
+         
logger.info("*******************************************************************************************************************************\nloop
 kill {}" + 
"\n*******************************************************************************************************************************",
 i);
+         server.getRemotingService().getConnections().forEach(r -> {
+            r.fail(new ActiveMQException("it's a simulation"));
+         });
+
+      }
+
+      running.set(false);
+      try {
+         flagStart.await(1, TimeUnit.SECONDS);
+      } catch (Exception ignored) {
+      }
+      if (!done.await(10, TimeUnit.SECONDS)) {
+         for (int i = 0; i < 10; i++) {
+            System.out.println("Will fail");
+            Thread.sleep(1000);
+         }
+         logger.warn(ThreadDumpUtil.threadDump("Still running"));
+         Assert.fail("Threads are still running");
+      }

Review Comment:
   I've told you a few times thats almost exactly what I actually do: //TODO: 
remove :)





Issue Time Tracking
-------------------

    Worklog Id:     (was: 889754)
    Time Spent: 2h 40m  (was: 2.5h)

> Connection Failure Race Conditions in AMQP and Core
> ---------------------------------------------------
>
>                 Key: ARTEMIS-4476
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4476
>             Project: ActiveMQ Artemis
>          Issue Type: Task
>            Reporter: Clebert Suconic
>            Assignee: Clebert Suconic
>            Priority: Major
>          Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> Failure Detection has a possibility to a race condition with the processing 
> of the client packets (or frames in the case of AMQP).
> This is because Netty detects the failure and removes the connection objects 
> while the packets are still processing things. 
> I was not able to reproduce this particular issue, but I have seen a case 
> from a memory dump where the consumer was created while the connection was 
> already dropped, leaving the consumer isolated without any communication with 
> clients.
> That particular case I could see a possibility because of these races.
> I am adding tests to exercise connection failure in stress and I was able to 
> reproduce other issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to