[
https://issues.apache.org/jira/browse/ARTEMIS-4476?focusedWorklogId=889739&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-889739
]
ASF GitHub Bot logged work on ARTEMIS-4476:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 09/Nov/23 14:28
Start Date: 09/Nov/23 14:28
Worklog Time Spent: 10m
Work Description: clebertsuconic commented on code in PR #4656:
URL: https://github.com/apache/activemq-artemis/pull/4656#discussion_r1388085854
##########
tests/integration-tests-isolated/src/test/java/org/apache/activemq/artemis/tests/integration/isolated/client/ConnectionDroppedTest.java:
##########
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.isolated.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import
org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.ThreadDumpUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.qpid.protonj2.test.driver.ProtonTestClient;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConnectionDroppedTest extends ActiveMQTestBase {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public ConnectionDroppedTest() {
+ // this is the reason why I'm putting this test on the "isolated"
package.
+ disableCheckThread();
+ }
+
+ @Test(timeout = 20_000)
+ public void testConsumerDroppedWithProtonTestClient() throws Exception {
+ int NUMBER_OF_CONNECTIONS = 100;
+ ActiveMQServer server = createServer(true, createDefaultConfig(true));
+ server.start();
+ Queue serverQueue = server.createQueue(new
QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST).setAddress("test-queue").setAutoCreated(false));
+
+ ExecutorService executorService =
Executors.newFixedThreadPool(NUMBER_OF_CONNECTIONS);
+ runAfter(executorService::shutdownNow);
+
+ CountDownLatch done = new CountDownLatch(NUMBER_OF_CONNECTIONS);
+ AtomicInteger errors = new AtomicInteger(0);
+
+ for (int i = 0; i < NUMBER_OF_CONNECTIONS; i++) {
+ executorService.execute(() -> {
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ peer.queueClientSaslAnonymousConnect();
+ peer.remoteOpen().queue();
+ peer.expectOpen();
+ peer.remoteBegin().queue();
+ peer.expectBegin();
+
peer.remoteAttach().ofReceiver().withName(RandomUtil.randomString()).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withTarget().also().withSource().withAddress("test-queue").withExpiryPolicyOnLinkDetach().withDurabilityOfNone().withCapabilities("queue").withOutcomes("amqp:accepted:list",
"amqp:rejected:list").also().queue();
+ peer.dropAfterLastHandler(1000); // This closes the netty
connection after the attach is written
+ peer.connect("localhost", 61616);
+
+ // Waits for all the commands to fire and the drop action to be
run.
+ peer.waitForScriptToComplete();
+ } catch (Throwable e) {
+ errors.incrementAndGet();
+ logger.warn(e.getMessage(), e);
+ } finally {
+ done.countDown();
+ }
+ });
+ }
+
+ Assert.assertTrue(done.await(10, TimeUnit.SECONDS));
+
+ Assert.assertEquals(0, errors.get());
+
+ Wait.assertEquals(0, () -> serverQueue.getConsumers().size(), 5000, 100);
+ }
+
+ @Test(timeout = 20_000)
+ public void testRegularClose() throws Exception {
+ int NUMBER_OF_CONNECTIONS = 100;
+ int REPEATS = 10;
+ ActiveMQServer server = createServer(true, createDefaultConfig(true));
+ server.start();
+ Queue serverQueue = server.createQueue(new
QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST).setAddress("test-queue").setAutoCreated(false));
+
+ ExecutorService executorService =
Executors.newFixedThreadPool(NUMBER_OF_CONNECTIONS);
+ runAfter(executorService::shutdownNow);
+
+ CountDownLatch done = new CountDownLatch(NUMBER_OF_CONNECTIONS);
+ AtomicInteger errors = new AtomicInteger(0);
+ AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler();
+ runAfter(loggerHandler::stop);
+
+ for (int i = 0; i < NUMBER_OF_CONNECTIONS; i++) {
+ executorService.execute(() -> {
+ for (int r = 0; r < REPEATS; r++) {
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ peer.queueClientSaslAnonymousConnect();
+ peer.remoteOpen().queue();
+ peer.expectOpen();
+ peer.remoteBegin().queue();
+ peer.expectBegin();
+
peer.remoteAttach().ofReceiver().withName(RandomUtil.randomString()).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withTarget().also().withSource().withAddress("test-queue").withExpiryPolicyOnLinkDetach().withDurabilityOfNone().withCapabilities("queue").withOutcomes("amqp:accepted:list",
"amqp:rejected:list").also().queue();
+ peer.expectAttach();
+ peer.remoteClose().queue();
+ peer.expectClose();
+
+ peer.connect("localhost", 61616);
+
+ peer.waitForScriptToComplete();
+ } catch (Throwable e) {
+ errors.incrementAndGet();
+ logger.warn(e.getMessage(), e);
+ break;
+ }
+ }
+ done.countDown();
+ });
+ }
+
+ Assert.assertTrue(done.await(10, TimeUnit.SECONDS));
+
+ Assert.assertEquals(0, errors.get());
+
+ Assert.assertFalse(loggerHandler.findText("AMQ212037"));
+
+ // TODO: Fix these as part of ARTEMIS-4483
+ /*Assert.assertFalse(loggerHandler.findText("Connection failure"));
+ Assert.assertFalse(loggerHandler.findText("REMOTE_DISCONNECT"));
+ Assert.assertFalse(loggerHandler.findText("AMQ222061"));
+ Assert.assertFalse(loggerHandler.findText("AMQ222107")); */
+
+ Wait.assertEquals(0, () -> serverQueue.getConsumers().size(), 5000, 100);
+ Wait.assertEquals(0, server::getConnectionCount, 5000);
+
+ }
+
+ @Test
+ public void testConsumerDroppedAMQP() throws Throwable {
+ testConsumerDroppedWithRegularClient("AMQP");
+
+ }
+
+ @Test
+ public void testConsumerDroppedCORE() throws Throwable {
+ testConsumerDroppedWithRegularClient("CORE");
+ }
+
+ @Test
+ public void testConsumerDroppedOpenWire() throws Throwable {
+ testConsumerDroppedWithRegularClient("OPENWIRE");
+ }
+
+ public void testConsumerDroppedWithRegularClient(final String protocol)
throws Throwable {
+ int NUMBER_OF_CONNECTIONS = 25;
+ int REPEATS = 10;
+ ActiveMQServer server = createServer(true, createDefaultConfig(true));
+ server.start();
+ Queue serverQueue = server.createQueue(new
QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST).setAddress("test-queue").setAutoCreated(false));
+
+ ExecutorService executorService =
Executors.newFixedThreadPool(NUMBER_OF_CONNECTIONS);
+ runAfter(executorService::shutdownNow);
+
+ CountDownLatch done = new CountDownLatch(NUMBER_OF_CONNECTIONS);
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory(protocol,
"tcp://localhost:61616");
+
+ final AtomicBoolean running = new AtomicBoolean(true);
+
+ runAfter(() -> running.set(false));
+
+ CyclicBarrier flagStart = new CyclicBarrier(NUMBER_OF_CONNECTIONS + 1);
+ flagStart.reset();
+
+ for (int i = 0; i < NUMBER_OF_CONNECTIONS; i++) {
+ final int t = i;
+ executorService.execute(() -> {
+ try {
+ boolean alreadyStarted = false;
+ AtomicBoolean ex = new AtomicBoolean(true);
+ while (running.get()) {
+ try {
+ // do not be tempted to use try (connection =
factory.createConnection())
+ // this is because we don't need to close the connection
after a network failure on this test.
+ Connection connection = factory.createConnection();
+
+ synchronized (ConnectionDroppedTest.this) {
+ runAfter(connection::close);
+ }
+ connection.setExceptionListener(new ExceptionListener() {
+ @Override
+ public void onException(JMSException exception) {
+ ex.set(true);
+ }
+ });
+ flagStart.await(60, TimeUnit.SECONDS);
+
+ connection.start();
+
+ Session session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
+ javax.jms.Queue jmsQueue =
session.createQueue("test-queue");
+
+ while (running.get() && !ex.get()) {
+ if (!alreadyStarted) {
+ alreadyStarted = true;
+ }
+ System.out.println("Consumer");
+ MessageConsumer consumer =
session.createConsumer(jmsQueue);
+ Thread.sleep(500);
+ }
+
+ if (!protocol.equals("CORE")) {
+ connection.close();
+ }
+ } catch (Exception e) {
+ logger.debug(e.getMessage(), e);
+ }
+ }
+ } finally {
+ done.countDown();
+ }
+ });
+ }
+
+ for (int i = 0; i < REPEATS; i++) {
+ try {
+ flagStart.await(60, TimeUnit.SECONDS); // align all the clients at
the same spot
+ } catch (Throwable throwable) {
+ logger.info(ThreadDumpUtil.threadDump("timed out flagstart"));
+ throw throwable;
+ }
+
+
logger.info("*******************************************************************************************************************************\nloop
kill {}" +
"\n*******************************************************************************************************************************",
i);
+ server.getRemotingService().getConnections().forEach(r -> {
+ r.fail(new ActiveMQException("it's a simulation"));
+ });
+
+ }
+
+ running.set(false);
+ try {
+ flagStart.await(1, TimeUnit.SECONDS);
+ } catch (Exception ignored) {
+ }
+ if (!done.await(10, TimeUnit.SECONDS)) {
+ for (int i = 0; i < 10; i++) {
+ System.out.println("Will fail");
+ Thread.sleep(1000);
+ }
+ logger.warn(ThreadDumpUtil.threadDump("Still running"));
+ Assert.fail("Threads are still running");
+ }
Review Comment:
I only use System.out.println() with temporary debug statements... I forgot
to remove it.
Issue Time Tracking
-------------------
Worklog Id: (was: 889739)
Time Spent: 2h 10m (was: 2h)
> Connection Failure Race Conditions in AMQP and Core
> ---------------------------------------------------
>
> Key: ARTEMIS-4476
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4476
> Project: ActiveMQ Artemis
> Issue Type: Task
> Reporter: Clebert Suconic
> Assignee: Clebert Suconic
> Priority: Major
> Time Spent: 2h 10m
> Remaining Estimate: 0h
>
> Failure Detection has a possibility to a race condition with the processing
> of the client packets (or frames in the case of AMQP).
> This is because Netty detects the failure and removes the connection objects
> while the packets are still processing things.
> I was not able to reproduce this particular issue, but I have seen a case
> from a memory dump where the consumer was created while the connection was
> already dropped, leaving the consumer isolated without any communication with
> clients.
> That particular case I could see a possibility because of these races.
> I am adding tests to exercise connection failure in stress and I was able to
> reproduce other issues.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)