[
https://issues.apache.org/jira/browse/ARTEMIS-3502?focusedWorklogId=658920&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-658920
]
ASF GitHub Bot logged work on ARTEMIS-3502:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 01/Oct/21 12:22
Start Date: 01/Oct/21 12:22
Worklog Time Spent: 10m
Work Description: clebertsuconic commented on a change in pull request
#3777:
URL: https://github.com/apache/activemq-artemis/pull/3777#discussion_r720199512
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateTest.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeTestAccessor;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.jboss.logging.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AutoCreateTest extends ActiveMQTestBase {
+ private static final Logger logger = Logger.getLogger(AutoCreateTest.class);
+
+ public final SimpleString addressA = new SimpleString("addressA");
+ public final SimpleString queueA = new SimpleString("queueA");
+
+ private ActiveMQServer server;
+
+ @After
+ public void clearLogg() {
+ AssertionLoggerHandler.stopCapture();
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ server = createServer(true, true);
+ server.getConfiguration().setAddressQueueScanPeriod(10000);
+ AddressSettings settings = new
AddressSettings().setAutoCreateAddresses(true).setAutoDeleteAddresses(true).setAutoCreateQueues(true).setAutoDeleteQueues(true);
+
+ server.getConfiguration().getAddressesSettings().clear();
+ server.getConfiguration().getAddressesSettings().put("#", settings);
+ }
+
+ @Test
+ public void testAutoCreateDeleteRecreate() throws Exception {
+ server.start();
+ String QUEUE_NAME = "autoCreateAndRecreate";
+
+ AtomicInteger errors = new AtomicInteger(0);
+
+ int THREADS = 10;
+ for (int i = 0; i < 200; i++) {
+ ConnectionFactory cf = CFUtil.createConnectionFactory("core",
"tcp://localhost:61616");
+
logger.debug("*******************************************************************************************************************************");
+ logger.debug("run " + i);
+ CyclicBarrier barrier = new CyclicBarrier(THREADS + 1);
+ CountDownLatch done = new CountDownLatch(THREADS);
+ Runnable consumerThread = () -> {
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ barrier.await(10, TimeUnit.SECONDS);
+ Queue queue = session.createQueue(QUEUE_NAME);
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+ } catch (Throwable e) {
+ e.printStackTrace(System.out);
+ errors.incrementAndGet();
+ } finally {
+ done.countDown();
+ }
+ };
+
+ Thread[] threads = new Thread[THREADS];
+ for (int j = 0; j < threads.length; j++) {
+ threads[j] = new Thread(consumerThread);
+ threads[j].start();
+ }
+
+ barrier.await(10, TimeUnit.SECONDS);
+ Assert.assertTrue(done.await(10, TimeUnit.SECONDS));
+ Assert.assertEquals(0, errors.get());
+
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(QUEUE_NAME);
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("hello"));
+
+ Assert.assertNotNull(consumer.receive(5000));
+ }
+
+ }
+ }
+
+ @Test
+ public void testSweep() throws Exception {
+
+ AssertionLoggerHandler.startCapture();
+ server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling
scanner, we will perform it manually
+ server.start();
+ String QUEUE_NAME = "autoCreateAndRecreate";
+
+ ConnectionFactory cf = CFUtil.createConnectionFactory("core",
"tcp://localhost:61616");
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(QUEUE_NAME);
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+ }
+
+ AddressInfo info =
server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(QUEUE_NAME));
+ Assert.assertNotNull(info);
+ Assert.assertTrue(info.isAutoCreated());
+
+ PostOfficeTestAccessor.reapAddresses((PostOfficeImpl)
server.getPostOffice());
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224112"));
+ PostOfficeTestAccessor.reapAddresses((PostOfficeImpl)
server.getPostOffice());
+ Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224112"));
+ Assert.assertTrue("Queue name should be mentioned on logs",
AssertionLoggerHandler.findText(QUEUE_NAME));
+ PostOfficeTestAccessor.reapAddresses((PostOfficeImpl)
server.getPostOffice());
+ Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224113")); // we
need another sweep to remove it
+ }
+
+ @Test
+ public void testSweepAddress() throws Exception {
+ AssertionLoggerHandler.startCapture();
+ server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling
scanner, we will perform it manually
+ AddressSettings settings = new
AddressSettings().setAutoDeleteQueues(true).setAutoDeleteAddresses(true).setAutoDeleteAddressesDelay(10).setAutoDeleteQueuesDelay(10);
+ server.getConfiguration().getAddressesSettings().clear();
+ server.getConfiguration().getAddressesSettings().put("#", settings);
+ server.start();
+ String ADDRESS_NAME = "autocreatedmulticast";
+
+ AddressInfo info = new
AddressInfo(ADDRESS_NAME).addRoutingType(RoutingType.MULTICAST).setAutoCreated(true);
+ server.getPostOffice().addAddressInfo(info);
+ info =
server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(ADDRESS_NAME));
+
+ ConnectionFactory cf = CFUtil.createConnectionFactory("core",
"tcp://localhost:61616");
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic(ADDRESS_NAME);
+ session.createConsumer(topic);
+ }
+
+ { // just a namespace area
+ final AddressInfo infoRef = info;
+ Wait.assertTrue(() -> infoRef.getBindingRemovedTimestamp() != -1);
+ }
+
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
+ Thread.sleep(50);
+ PostOfficeTestAccessor.reapAddresses((PostOfficeImpl)
server.getPostOffice());
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
+ Assert.assertTrue(info.isSwept());
+ PostOfficeTestAccessor.reapAddresses((PostOfficeImpl)
server.getPostOffice());
+ Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224113"));
+ }
+
+
+ @Test
+ public void testNegativeSweepAddress() throws Exception {
+ AssertionLoggerHandler.startCapture();
+ server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling
scanner, we will perform it manually
+ AddressSettings settings = new
AddressSettings().setAutoDeleteQueues(true).setAutoDeleteAddresses(true).setAutoDeleteAddressesDelay(10).setAutoDeleteQueuesDelay(10);
+ server.getConfiguration().getAddressesSettings().clear();
+ server.getConfiguration().getAddressesSettings().put("#", settings);
+ server.start();
+ String ADDRESS_NAME = "autocreatedmulticast";
+
+ AddressInfo info = new
AddressInfo(ADDRESS_NAME).addRoutingType(RoutingType.MULTICAST).setAutoCreated(true);
+ server.getPostOffice().addAddressInfo(info);
+ info =
server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(ADDRESS_NAME));
+
+ ConnectionFactory cf = CFUtil.createConnectionFactory("core",
"tcp://localhost:61616");
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic(ADDRESS_NAME);
+ session.createConsumer(topic);
+ }
+
+ { // just a namespace area
+ final AddressInfo infoRef = info;
+ Wait.assertTrue(() -> infoRef.getBindingRemovedTimestamp() != -1);
+ }
+
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
+ Thread.sleep(50);
+ PostOfficeTestAccessor.reapAddresses((PostOfficeImpl)
server.getPostOffice());
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
+ Assert.assertTrue(info.isSwept());
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic(ADDRESS_NAME);
+ session.createConsumer(topic);
+ PostOfficeTestAccessor.reapAddresses((PostOfficeImpl)
server.getPostOffice());
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
+ Assert.assertFalse(info.isSwept()); // it should be cleared because
there is a consumer now
Review comment:
it should because I disabled the reaper and I am doing it by hand only.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 658920)
Time Spent: 1h 10m (was: 1h)
> Auto delete of a queue could lead to inconsistencies
> ----------------------------------------------------
>
> Key: ARTEMIS-3502
> URL: https://issues.apache.org/jira/browse/ARTEMIS-3502
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Reporter: Clebert Suconic
> Assignee: Clebert Suconic
> Priority: Major
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> Auto delete might happen after a disconnect of consumers.
> on a situation where the consumer was up to date the queue could be removed,
> leading to a few issues.
> we should improve how we handle with auto-delete and avoid these
> inconsistencies.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)