[
https://issues.apache.org/jira/browse/ARTEMIS-4476?focusedWorklogId=889339&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-889339
]
ASF GitHub Bot logged work on ARTEMIS-4476:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 07/Nov/23 21:28
Start Date: 07/Nov/23 21:28
Worklog Time Spent: 10m
Work Description: tabish121 commented on code in PR #4656:
URL: https://github.com/apache/activemq-artemis/pull/4656#discussion_r1385597343
##########
tests/integration-tests-isolated/src/test/java/org/apache/activemq/artemis/tests/integration/isolated/client/ConnectionDroppedTest.java:
##########
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.isolated.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import
org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.ThreadDumpUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.qpid.protonj2.test.driver.ProtonTestClient;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConnectionDroppedTest extends ActiveMQTestBase {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public ConnectionDroppedTest() {
+ // this is the reason why I'm putting this test on the "isolated"
package.
+ disableCheckThread();
+ }
+
+ @Test(timeout = 20_000)
+ public void testConsumerDroppedWithProtonTestClient() throws Exception {
+ int NUMBER_OF_CONNECTIONS = 100;
+ ActiveMQServer server = createServer(true, createDefaultConfig(true));
+ server.start();
+ Queue serverQueue = server.createQueue(new
QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST).setAddress("test-queue").setAutoCreated(false));
+
+ ExecutorService executorService =
Executors.newFixedThreadPool(NUMBER_OF_CONNECTIONS);
+ runAfter(executorService::shutdownNow);
+
+ CountDownLatch done = new CountDownLatch(NUMBER_OF_CONNECTIONS);
+ AtomicInteger errors = new AtomicInteger(0);
+
+ for (int i = 0; i < NUMBER_OF_CONNECTIONS; i++) {
+ executorService.execute(() -> {
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ peer.queueClientSaslAnonymousConnect();
+ peer.remoteOpen().queue();
+ peer.expectOpen();
+ peer.remoteBegin().queue();
+ peer.expectBegin();
+
peer.remoteAttach().ofReceiver().withName(RandomUtil.randomString()).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withTarget().also().withSource().withAddress("test-queue").withExpiryPolicyOnLinkDetach().withDurabilityOfNone().withCapabilities("queue").withOutcomes("amqp:accepted:list",
"amqp:rejected:list").also().queue();
+ peer.dropAfterLastHandler(1000); // This closes the netty
connection after the attach is written
+ peer.connect("localhost", 61616);
+
+ // Waits for all the commands to fire and the drop action to be
run.
+ peer.waitForScriptToComplete();
+ } catch (Throwable e) {
+ errors.incrementAndGet();
+ logger.warn(e.getMessage(), e);
+ } finally {
+ done.countDown();
+ }
+ });
+ }
+
+ Assert.assertTrue(done.await(10, TimeUnit.SECONDS));
+
+ Assert.assertEquals(0, errors.get());
+
+ Wait.assertEquals(0, () -> serverQueue.getConsumers().size(), 5000, 100);
+ }
+
+ @Test(timeout = 20_000)
+ public void testRegularClose() throws Exception {
+ int NUMBER_OF_CONNECTIONS = 100;
+ int REPEATS = 10;
+ ActiveMQServer server = createServer(true, createDefaultConfig(true));
+ server.start();
+ Queue serverQueue = server.createQueue(new
QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST).setAddress("test-queue").setAutoCreated(false));
+
+ ExecutorService executorService =
Executors.newFixedThreadPool(NUMBER_OF_CONNECTIONS);
+ runAfter(executorService::shutdownNow);
+
+ CountDownLatch done = new CountDownLatch(NUMBER_OF_CONNECTIONS);
+ AtomicInteger errors = new AtomicInteger(0);
+ AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler();
+ runAfter(loggerHandler::stop);
+
+ for (int i = 0; i < NUMBER_OF_CONNECTIONS; i++) {
+ executorService.execute(() -> {
+ for (int r = 0; r < REPEATS; r++) {
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ peer.queueClientSaslAnonymousConnect();
+ peer.remoteOpen().queue();
+ peer.expectOpen();
+ peer.remoteBegin().queue();
+ peer.expectBegin();
+
peer.remoteAttach().ofReceiver().withName(RandomUtil.randomString()).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withTarget().also().withSource().withAddress("test-queue").withExpiryPolicyOnLinkDetach().withDurabilityOfNone().withCapabilities("queue").withOutcomes("amqp:accepted:list",
"amqp:rejected:list").also().queue();
+ peer.expectAttach();
+ peer.remoteClose().queue();
+ peer.expectClose();
+
+ peer.connect("localhost", 61616);
+
+ // Waits for all the commands to fire and the drop action to
be run.
+ peer.waitForScriptToComplete();
+ } catch (Throwable e) {
+ errors.incrementAndGet();
+ logger.warn(e.getMessage(), e);
+ break;
+ }
+ }
+ done.countDown();
+ });
+ }
+
+ Assert.assertTrue(done.await(10, TimeUnit.SECONDS));
+
+ Assert.assertEquals(0, errors.get());
+
+ Assert.assertFalse(loggerHandler.findText("AMQ212037"));
+
+ // TODO: Fix these on a future commit
+ /*Assert.assertFalse(loggerHandler.findText("Connection failure"));
+ Assert.assertFalse(loggerHandler.findText("REMOTE_DISCONNECT"));
+ Assert.assertFalse(loggerHandler.findText("AMQ222061"));
+ Assert.assertFalse(loggerHandler.findText("AMQ222107")); */
+
+ Wait.assertEquals(0, () -> serverQueue.getConsumers().size(), 5000, 100);
+ Wait.assertEquals(0, server::getConnectionCount, 5000);
+
+ }
+
+ @Test
+ public void testConsumerDroppedAMQP() throws Throwable {
+ testConsumerDroppedWithRegularClient("AMQP");
+
+ }
+
+ @Test
+ public void testConsumerDroppedCORE() throws Throwable {
+ testConsumerDroppedWithRegularClient("CORE");
+ }
+
+ @Test
+ public void testConsumerDroppedOpenWire() throws Throwable {
+ testConsumerDroppedWithRegularClient("OPENWIRE");
+ }
+
+ public void testConsumerDroppedWithRegularClient(final String protocol)
throws Throwable {
+ int NUMBER_OF_CONNECTIONS = 25;
+ int REPEATS = 10;
+ ActiveMQServer server = createServer(true, createDefaultConfig(true));
+ server.start();
+ Queue serverQueue = server.createQueue(new
QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST).setAddress("test-queue").setAutoCreated(false));
+
+ ExecutorService executorService =
Executors.newFixedThreadPool(NUMBER_OF_CONNECTIONS);
+ runAfter(executorService::shutdownNow);
+
+ CountDownLatch done = new CountDownLatch(NUMBER_OF_CONNECTIONS);
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory(protocol,
"tcp://localhost:61616");
+
+ final AtomicBoolean running = new AtomicBoolean(true);
+
+ runAfter(() -> running.set(false));
+
+ CyclicBarrier flagStart = new CyclicBarrier(NUMBER_OF_CONNECTIONS + 1);
+ flagStart.reset();
+
+ for (int i = 0; i < NUMBER_OF_CONNECTIONS; i++) {
+ final int t = i;
+ executorService.execute(() -> {
+ try {
+ boolean alreadyStarted = false;
+ AtomicBoolean ex = new AtomicBoolean(true);
+ while (running.get()) {
+ try {
+ // do not be tempted to use try (connection =
factory.createConnection())
+ // this is because we don't need to close the connection
after a network failure on this test.
+ Connection connection = factory.createConnection();
+
+ synchronized (ConnectionDroppedTest.this) {
+ runAfter(connection::close);
+ }
+ connection.setExceptionListener(new ExceptionListener() {
+ @Override
+ public void onException(JMSException exception) {
+ ex.set(true);
+ }
+ });
+ flagStart.await(60, TimeUnit.SECONDS);
+
+ connection.start();
+
+ Session session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
+ javax.jms.Queue jmsQueue =
session.createQueue("test-queue");
+
+ while (running.get() && !ex.get()) {
+ if (!alreadyStarted) {
+ alreadyStarted = true;
+ }
+ System.out.println("Consumer");
+ MessageConsumer consumer =
session.createConsumer(jmsQueue);
+ Thread.sleep(500);
+ }
+
+ if (!protocol.equals("CORE")) {
+ connection.close();
+ }
+ } catch (Exception e) {
+ logger.debug(e.getMessage(), e);
+ }
+ }
+ } finally {
+ done.countDown();
+ }
+ });
+ }
+
+ for (int i = 0; i < REPEATS; i++) {
+ try {
+ flagStart.await(60, TimeUnit.SECONDS); // align all the clients at
the same spot
+ } catch (Throwable throwable) {
+ logger.info(ThreadDumpUtil.threadDump("timed out flagstart"));
+ throw throwable;
+ }
+
+
logger.info("*******************************************************************************************************************************\nloop
kill {}" +
"\n*******************************************************************************************************************************",
i);
+ server.getRemotingService().getConnections().forEach(r -> {
+ r.fail(new ActiveMQException("it's a simulation"));
+ });
+
+ }
+
+ running.set(false);
+ try {
+ flagStart.await(1, TimeUnit.SECONDS);
+ } catch (Exception ignored) {
+ }
+ if (!done.await(10, TimeUnit.SECONDS)) {
+ for (int i = 0; i < 10; i++) {
+ System.out.println("Will fail");
+ Thread.sleep(1000);
+ }
+ logger.warn(ThreadDumpUtil.threadDump("Still running"));
+ Assert.fail("Threads are still running");
+ }
+
+ Wait.assertEquals(0, () -> serverQueue.getConsumers().size(), 5000, 100);
+
+ }
+
+ @Test
+ public void testDropConsumerProtonJ2TestClient() throws Throwable {
+ ReusableLatch latchCreating = new ReusableLatch(1);
+ ReusableLatch blockCreate = new ReusableLatch(1);
+ ReusableLatch done = new ReusableLatch(1);
+ ActiveMQServer server = createServer(true, createDefaultConfig(true));
+ server.start();
+
+ int TEST_REPEATS = 4;
+
+ server.registerBrokerPlugin(new ActiveMQServerSessionPlugin() {
+ @Override
+ public void beforeCreateSession(String name,
+ String username,
+ int minLargeMessageSize,
+ RemotingConnection connection,
+ boolean autoCommitSends,
+ boolean autoCommitAcks,
+ boolean preAcknowledge,
+ boolean xa,
+ String defaultAddress,
+ SessionCallback callback,
+ boolean autoCreateQueues,
+ OperationContext context,
+ Map<SimpleString, RoutingType>
prefixes) throws ActiveMQException {
+ latchCreating.countDown();
+ try {
+ blockCreate.await(10, TimeUnit.HOURS);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ } finally {
+ done.countDown();
+ }
+ }
+ });
+
+ AtomicBoolean running = new AtomicBoolean(true);
+ ExecutorService executorService = Executors.newFixedThreadPool(1);
+ runAfter(executorService::shutdownNow);
+ runAfter(() -> running.set(false));
+ Queue serverQueue = server.createQueue(new
QueueConfiguration(getName()).setRoutingType(RoutingType.ANYCAST).setAddress(getName()).setAutoCreated(false));
+
+ for (int i = 0; i < TEST_REPEATS; i++) {
+ Assert.assertEquals(0, serverQueue.getConsumerCount());
+ latchCreating.setCount(1);
+ blockCreate.setCount(1);
+ done.setCount(1);
+
+ ProtonTestClient peer = new ProtonTestClient();
+
+ executorService.execute(() -> {
+
+ try {
+ peer.queueClientSaslAnonymousConnect();
+ peer.remoteOpen().queue();
+ peer.remoteBegin().queue();
+
peer.remoteAttach().ofReceiver().withName(RandomUtil.randomString()).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withTarget().also().withSource().withAddress(getName()).withExpiryPolicyOnLinkDetach().withDurabilityOfNone().withCapabilities("queue").withOutcomes("amqp:accepted:list",
"amqp:rejected:list").also().queue();
+
+ peer.connect("localhost", 61616);
+
+ // Waits for all the commands to fire and the drop action to be
run.
+ peer.waitForScriptToComplete();
Review Comment:
I might add a 'peer.expectOpen()' before the connect as the wait action will
throw an exception if the broker emits an open before stalling on the begin
processing which could lead to some confusing logging output at the least.
Although in this case you could probably also just use a different wait method
'peer.waitForScriptToCompleteIgnoreErrors()' which will just let it do what you
are asking and then shutdown cleanly without any errors being thrown or logged.
##########
tests/integration-tests-isolated/src/test/java/org/apache/activemq/artemis/tests/integration/isolated/client/ConnectionDroppedTest.java:
##########
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.isolated.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import
org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.ThreadDumpUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.qpid.protonj2.test.driver.ProtonTestClient;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConnectionDroppedTest extends ActiveMQTestBase {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public ConnectionDroppedTest() {
+ // this is the reason why I'm putting this test on the "isolated"
package.
+ disableCheckThread();
+ }
+
+ @Test(timeout = 20_000)
+ public void testConsumerDroppedWithProtonTestClient() throws Exception {
+ int NUMBER_OF_CONNECTIONS = 100;
+ ActiveMQServer server = createServer(true, createDefaultConfig(true));
+ server.start();
+ Queue serverQueue = server.createQueue(new
QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST).setAddress("test-queue").setAutoCreated(false));
+
+ ExecutorService executorService =
Executors.newFixedThreadPool(NUMBER_OF_CONNECTIONS);
+ runAfter(executorService::shutdownNow);
+
+ CountDownLatch done = new CountDownLatch(NUMBER_OF_CONNECTIONS);
+ AtomicInteger errors = new AtomicInteger(0);
+
+ for (int i = 0; i < NUMBER_OF_CONNECTIONS; i++) {
+ executorService.execute(() -> {
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ peer.queueClientSaslAnonymousConnect();
+ peer.remoteOpen().queue();
+ peer.expectOpen();
+ peer.remoteBegin().queue();
+ peer.expectBegin();
+
peer.remoteAttach().ofReceiver().withName(RandomUtil.randomString()).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withTarget().also().withSource().withAddress("test-queue").withExpiryPolicyOnLinkDetach().withDurabilityOfNone().withCapabilities("queue").withOutcomes("amqp:accepted:list",
"amqp:rejected:list").also().queue();
+ peer.dropAfterLastHandler(1000); // This closes the netty
connection after the attach is written
+ peer.connect("localhost", 61616);
+
+ // Waits for all the commands to fire and the drop action to be
run.
+ peer.waitForScriptToComplete();
+ } catch (Throwable e) {
+ errors.incrementAndGet();
+ logger.warn(e.getMessage(), e);
+ } finally {
+ done.countDown();
+ }
+ });
+ }
+
+ Assert.assertTrue(done.await(10, TimeUnit.SECONDS));
+
+ Assert.assertEquals(0, errors.get());
+
+ Wait.assertEquals(0, () -> serverQueue.getConsumers().size(), 5000, 100);
+ }
+
+ @Test(timeout = 20_000)
+ public void testRegularClose() throws Exception {
+ int NUMBER_OF_CONNECTIONS = 100;
+ int REPEATS = 10;
+ ActiveMQServer server = createServer(true, createDefaultConfig(true));
+ server.start();
+ Queue serverQueue = server.createQueue(new
QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST).setAddress("test-queue").setAutoCreated(false));
+
+ ExecutorService executorService =
Executors.newFixedThreadPool(NUMBER_OF_CONNECTIONS);
+ runAfter(executorService::shutdownNow);
+
+ CountDownLatch done = new CountDownLatch(NUMBER_OF_CONNECTIONS);
+ AtomicInteger errors = new AtomicInteger(0);
+ AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler();
+ runAfter(loggerHandler::stop);
+
+ for (int i = 0; i < NUMBER_OF_CONNECTIONS; i++) {
+ executorService.execute(() -> {
+ for (int r = 0; r < REPEATS; r++) {
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ peer.queueClientSaslAnonymousConnect();
+ peer.remoteOpen().queue();
+ peer.expectOpen();
+ peer.remoteBegin().queue();
+ peer.expectBegin();
+
peer.remoteAttach().ofReceiver().withName(RandomUtil.randomString()).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withTarget().also().withSource().withAddress("test-queue").withExpiryPolicyOnLinkDetach().withDurabilityOfNone().withCapabilities("queue").withOutcomes("amqp:accepted:list",
"amqp:rejected:list").also().queue();
+ peer.expectAttach();
+ peer.remoteClose().queue();
+ peer.expectClose();
+
+ peer.connect("localhost", 61616);
+
+ // Waits for all the commands to fire and the drop action to
be run.
Review Comment:
Comment isn't correct, no drop action is queued in this test
##########
tests/integration-tests-isolated/src/test/java/org/apache/activemq/artemis/tests/integration/isolated/client/ConnectionDroppedTest.java:
##########
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.isolated.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import
org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.ThreadDumpUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.qpid.protonj2.test.driver.ProtonTestClient;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConnectionDroppedTest extends ActiveMQTestBase {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public ConnectionDroppedTest() {
+ // this is the reason why I'm putting this test on the "isolated"
package.
+ disableCheckThread();
+ }
+
+ @Test(timeout = 20_000)
+ public void testConsumerDroppedWithProtonTestClient() throws Exception {
+ int NUMBER_OF_CONNECTIONS = 100;
+ ActiveMQServer server = createServer(true, createDefaultConfig(true));
+ server.start();
+ Queue serverQueue = server.createQueue(new
QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST).setAddress("test-queue").setAutoCreated(false));
+
+ ExecutorService executorService =
Executors.newFixedThreadPool(NUMBER_OF_CONNECTIONS);
+ runAfter(executorService::shutdownNow);
+
+ CountDownLatch done = new CountDownLatch(NUMBER_OF_CONNECTIONS);
+ AtomicInteger errors = new AtomicInteger(0);
+
+ for (int i = 0; i < NUMBER_OF_CONNECTIONS; i++) {
+ executorService.execute(() -> {
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ peer.queueClientSaslAnonymousConnect();
+ peer.remoteOpen().queue();
+ peer.expectOpen();
+ peer.remoteBegin().queue();
+ peer.expectBegin();
+
peer.remoteAttach().ofReceiver().withName(RandomUtil.randomString()).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withTarget().also().withSource().withAddress("test-queue").withExpiryPolicyOnLinkDetach().withDurabilityOfNone().withCapabilities("queue").withOutcomes("amqp:accepted:list",
"amqp:rejected:list").also().queue();
+ peer.dropAfterLastHandler(1000); // This closes the netty
connection after the attach is written
+ peer.connect("localhost", 61616);
+
+ // Waits for all the commands to fire and the drop action to be
run.
+ peer.waitForScriptToComplete();
+ } catch (Throwable e) {
+ errors.incrementAndGet();
+ logger.warn(e.getMessage(), e);
+ } finally {
+ done.countDown();
+ }
+ });
+ }
+
+ Assert.assertTrue(done.await(10, TimeUnit.SECONDS));
+
+ Assert.assertEquals(0, errors.get());
+
+ Wait.assertEquals(0, () -> serverQueue.getConsumers().size(), 5000, 100);
+ }
+
+ @Test(timeout = 20_000)
+ public void testRegularClose() throws Exception {
+ int NUMBER_OF_CONNECTIONS = 100;
+ int REPEATS = 10;
+ ActiveMQServer server = createServer(true, createDefaultConfig(true));
+ server.start();
+ Queue serverQueue = server.createQueue(new
QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST).setAddress("test-queue").setAutoCreated(false));
+
+ ExecutorService executorService =
Executors.newFixedThreadPool(NUMBER_OF_CONNECTIONS);
+ runAfter(executorService::shutdownNow);
+
+ CountDownLatch done = new CountDownLatch(NUMBER_OF_CONNECTIONS);
+ AtomicInteger errors = new AtomicInteger(0);
+ AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler();
+ runAfter(loggerHandler::stop);
+
+ for (int i = 0; i < NUMBER_OF_CONNECTIONS; i++) {
+ executorService.execute(() -> {
+ for (int r = 0; r < REPEATS; r++) {
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ peer.queueClientSaslAnonymousConnect();
+ peer.remoteOpen().queue();
+ peer.expectOpen();
+ peer.remoteBegin().queue();
+ peer.expectBegin();
+
peer.remoteAttach().ofReceiver().withName(RandomUtil.randomString()).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withTarget().also().withSource().withAddress("test-queue").withExpiryPolicyOnLinkDetach().withDurabilityOfNone().withCapabilities("queue").withOutcomes("amqp:accepted:list",
"amqp:rejected:list").also().queue();
+ peer.expectAttach();
+ peer.remoteClose().queue();
+ peer.expectClose();
+
+ peer.connect("localhost", 61616);
+
+ // Waits for all the commands to fire and the drop action to
be run.
+ peer.waitForScriptToComplete();
+ } catch (Throwable e) {
+ errors.incrementAndGet();
+ logger.warn(e.getMessage(), e);
+ break;
+ }
+ }
+ done.countDown();
+ });
+ }
+
+ Assert.assertTrue(done.await(10, TimeUnit.SECONDS));
+
+ Assert.assertEquals(0, errors.get());
+
+ Assert.assertFalse(loggerHandler.findText("AMQ212037"));
+
+ // TODO: Fix these on a future commit
+ /*Assert.assertFalse(loggerHandler.findText("Connection failure"));
+ Assert.assertFalse(loggerHandler.findText("REMOTE_DISCONNECT"));
+ Assert.assertFalse(loggerHandler.findText("AMQ222061"));
+ Assert.assertFalse(loggerHandler.findText("AMQ222107")); */
+
+ Wait.assertEquals(0, () -> serverQueue.getConsumers().size(), 5000, 100);
+ Wait.assertEquals(0, server::getConnectionCount, 5000);
+
+ }
+
+ @Test
+ public void testConsumerDroppedAMQP() throws Throwable {
+ testConsumerDroppedWithRegularClient("AMQP");
+
+ }
+
+ @Test
+ public void testConsumerDroppedCORE() throws Throwable {
+ testConsumerDroppedWithRegularClient("CORE");
+ }
+
+ @Test
+ public void testConsumerDroppedOpenWire() throws Throwable {
+ testConsumerDroppedWithRegularClient("OPENWIRE");
+ }
+
+ public void testConsumerDroppedWithRegularClient(final String protocol)
throws Throwable {
+ int NUMBER_OF_CONNECTIONS = 25;
+ int REPEATS = 10;
+ ActiveMQServer server = createServer(true, createDefaultConfig(true));
+ server.start();
+ Queue serverQueue = server.createQueue(new
QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST).setAddress("test-queue").setAutoCreated(false));
+
+ ExecutorService executorService =
Executors.newFixedThreadPool(NUMBER_OF_CONNECTIONS);
+ runAfter(executorService::shutdownNow);
+
+ CountDownLatch done = new CountDownLatch(NUMBER_OF_CONNECTIONS);
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory(protocol,
"tcp://localhost:61616");
+
+ final AtomicBoolean running = new AtomicBoolean(true);
+
+ runAfter(() -> running.set(false));
+
+ CyclicBarrier flagStart = new CyclicBarrier(NUMBER_OF_CONNECTIONS + 1);
+ flagStart.reset();
+
+ for (int i = 0; i < NUMBER_OF_CONNECTIONS; i++) {
+ final int t = i;
+ executorService.execute(() -> {
+ try {
+ boolean alreadyStarted = false;
+ AtomicBoolean ex = new AtomicBoolean(true);
+ while (running.get()) {
+ try {
+ // do not be tempted to use try (connection =
factory.createConnection())
+ // this is because we don't need to close the connection
after a network failure on this test.
+ Connection connection = factory.createConnection();
+
+ synchronized (ConnectionDroppedTest.this) {
+ runAfter(connection::close);
+ }
+ connection.setExceptionListener(new ExceptionListener() {
+ @Override
+ public void onException(JMSException exception) {
+ ex.set(true);
+ }
+ });
+ flagStart.await(60, TimeUnit.SECONDS);
+
+ connection.start();
+
+ Session session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
+ javax.jms.Queue jmsQueue =
session.createQueue("test-queue");
+
+ while (running.get() && !ex.get()) {
+ if (!alreadyStarted) {
+ alreadyStarted = true;
+ }
+ System.out.println("Consumer");
+ MessageConsumer consumer =
session.createConsumer(jmsQueue);
+ Thread.sleep(500);
+ }
+
+ if (!protocol.equals("CORE")) {
+ connection.close();
+ }
+ } catch (Exception e) {
+ logger.debug(e.getMessage(), e);
+ }
+ }
+ } finally {
+ done.countDown();
+ }
+ });
+ }
+
+ for (int i = 0; i < REPEATS; i++) {
+ try {
+ flagStart.await(60, TimeUnit.SECONDS); // align all the clients at
the same spot
+ } catch (Throwable throwable) {
+ logger.info(ThreadDumpUtil.threadDump("timed out flagstart"));
+ throw throwable;
+ }
+
+
logger.info("*******************************************************************************************************************************\nloop
kill {}" +
"\n*******************************************************************************************************************************",
i);
+ server.getRemotingService().getConnections().forEach(r -> {
+ r.fail(new ActiveMQException("it's a simulation"));
+ });
+
+ }
+
+ running.set(false);
+ try {
+ flagStart.await(1, TimeUnit.SECONDS);
+ } catch (Exception ignored) {
+ }
+ if (!done.await(10, TimeUnit.SECONDS)) {
+ for (int i = 0; i < 10; i++) {
+ System.out.println("Will fail");
+ Thread.sleep(1000);
+ }
+ logger.warn(ThreadDumpUtil.threadDump("Still running"));
+ Assert.fail("Threads are still running");
+ }
+
+ Wait.assertEquals(0, () -> serverQueue.getConsumers().size(), 5000, 100);
+
+ }
+
+ @Test
+ public void testDropConsumerProtonJ2TestClient() throws Throwable {
+ ReusableLatch latchCreating = new ReusableLatch(1);
+ ReusableLatch blockCreate = new ReusableLatch(1);
+ ReusableLatch done = new ReusableLatch(1);
+ ActiveMQServer server = createServer(true, createDefaultConfig(true));
+ server.start();
+
+ int TEST_REPEATS = 4;
+
+ server.registerBrokerPlugin(new ActiveMQServerSessionPlugin() {
+ @Override
+ public void beforeCreateSession(String name,
+ String username,
+ int minLargeMessageSize,
+ RemotingConnection connection,
+ boolean autoCommitSends,
+ boolean autoCommitAcks,
+ boolean preAcknowledge,
+ boolean xa,
+ String defaultAddress,
+ SessionCallback callback,
+ boolean autoCreateQueues,
+ OperationContext context,
+ Map<SimpleString, RoutingType>
prefixes) throws ActiveMQException {
+ latchCreating.countDown();
+ try {
+ blockCreate.await(10, TimeUnit.HOURS);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ } finally {
+ done.countDown();
+ }
+ }
+ });
+
+ AtomicBoolean running = new AtomicBoolean(true);
+ ExecutorService executorService = Executors.newFixedThreadPool(1);
+ runAfter(executorService::shutdownNow);
+ runAfter(() -> running.set(false));
+ Queue serverQueue = server.createQueue(new
QueueConfiguration(getName()).setRoutingType(RoutingType.ANYCAST).setAddress(getName()).setAutoCreated(false));
+
+ for (int i = 0; i < TEST_REPEATS; i++) {
+ Assert.assertEquals(0, serverQueue.getConsumerCount());
+ latchCreating.setCount(1);
+ blockCreate.setCount(1);
+ done.setCount(1);
+
+ ProtonTestClient peer = new ProtonTestClient();
+
+ executorService.execute(() -> {
+
+ try {
+ peer.queueClientSaslAnonymousConnect();
+ peer.remoteOpen().queue();
+ peer.remoteBegin().queue();
+
peer.remoteAttach().ofReceiver().withName(RandomUtil.randomString()).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withTarget().also().withSource().withAddress(getName()).withExpiryPolicyOnLinkDetach().withDurabilityOfNone().withCapabilities("queue").withOutcomes("amqp:accepted:list",
"amqp:rejected:list").also().queue();
+
+ peer.connect("localhost", 61616);
+
+ // Waits for all the commands to fire and the drop action to be
run.
Review Comment:
Comment is not correct, seems to be cut and pasted from another test, no
drop action was added in this test.
Issue Time Tracking
-------------------
Worklog Id: (was: 889339)
Time Spent: 40m (was: 0.5h)
> Connection Failure Race Conditions in AMQP and Core
> ---------------------------------------------------
>
> Key: ARTEMIS-4476
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4476
> Project: ActiveMQ Artemis
> Issue Type: Task
> Reporter: Clebert Suconic
> Assignee: Clebert Suconic
> Priority: Major
> Time Spent: 40m
> Remaining Estimate: 0h
>
> Failure Detection has a possibility to a race condition with the processing
> of the client packets (or frames in the case of AMQP).
> This is because Netty detects the failure and removes the connection objects
> while the packets are still processing things.
> I was not able to reproduce this particular issue, but I have seen a case
> from a memory dump where the consumer was created while the connection was
> already dropped, leaving the consumer isolated without any communication with
> clients.
> That particular case I could see a possibility because of these races.
> I am adding tests to exercise connection failure in stress and I was able to
> reproduce other issues.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)