[
https://issues.apache.org/jira/browse/ARTEMIS-3502?focusedWorklogId=659561&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-659561
]
ASF GitHub Bot logged work on ARTEMIS-3502:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 04/Oct/21 13:29
Start Date: 04/Oct/21 13:29
Worklog Time Spent: 10m
Work Description: gemmellr commented on a change in pull request #3777:
URL: https://github.com/apache/activemq-artemis/pull/3777#discussion_r721365734
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateTest.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeTestAccessor;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.jboss.logging.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AutoCreateTest extends ActiveMQTestBase {
+ private static final Logger logger = Logger.getLogger(AutoCreateTest.class);
+
+ public final SimpleString addressA = new SimpleString("addressA");
+ public final SimpleString queueA = new SimpleString("queueA");
+
+ private ActiveMQServer server;
+
+ @After
+ public void clearLogg() {
+ AssertionLoggerHandler.stopCapture();
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ server = createServer(true, true);
+ server.getConfiguration().setAddressQueueScanPeriod(10000);
+ AddressSettings settings = new
AddressSettings().setAutoCreateAddresses(true).setAutoDeleteAddresses(true).setAutoCreateQueues(true).setAutoDeleteQueues(true);
+
+ server.getConfiguration().getAddressesSettings().clear();
+ server.getConfiguration().getAddressesSettings().put("#", settings);
+ }
+
+ @Test
+ public void testAutoCreateDeleteRecreate() throws Exception {
+ server.start();
+ String QUEUE_NAME = "autoCreateAndRecreate";
+
+ AtomicInteger errors = new AtomicInteger(0);
+
+ int THREADS = 10;
+ for (int i = 0; i < 200; i++) {
+ ConnectionFactory cf = CFUtil.createConnectionFactory("core",
"tcp://localhost:61616");
+
logger.debug("*******************************************************************************************************************************");
+ logger.debug("run " + i);
+ CyclicBarrier barrier = new CyclicBarrier(THREADS + 1);
+ CountDownLatch done = new CountDownLatch(THREADS);
+ Runnable consumerThread = () -> {
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ barrier.await(10, TimeUnit.SECONDS);
+ Queue queue = session.createQueue(QUEUE_NAME);
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+ } catch (Throwable e) {
+ e.printStackTrace(System.out);
+ errors.incrementAndGet();
+ } finally {
+ done.countDown();
+ }
+ };
+
+ Thread[] threads = new Thread[THREADS];
+ for (int j = 0; j < threads.length; j++) {
+ threads[j] = new Thread(consumerThread);
+ threads[j].start();
+ }
+
+ barrier.await(10, TimeUnit.SECONDS);
+ Assert.assertTrue(done.await(10, TimeUnit.SECONDS));
+ Assert.assertEquals(0, errors.get());
+
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(QUEUE_NAME);
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("hello"));
+
+ Assert.assertNotNull(consumer.receive(5000));
+ }
+
+ }
+ }
+
+ @Test
+ public void testSweep() throws Exception {
+
+ AssertionLoggerHandler.startCapture();
+ server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling
scanner, we will perform it manually
+ server.start();
+ String QUEUE_NAME = "autoCreateAndRecreate";
+
+ ConnectionFactory cf = CFUtil.createConnectionFactory("core",
"tcp://localhost:61616");
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(QUEUE_NAME);
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+ }
+
+ AddressInfo info =
server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(QUEUE_NAME));
+ Assert.assertNotNull(info);
+ Assert.assertTrue(info.isAutoCreated());
+
+ PostOfficeTestAccessor.reapAddresses((PostOfficeImpl)
server.getPostOffice());
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224112"));
+ PostOfficeTestAccessor.reapAddresses((PostOfficeImpl)
server.getPostOffice());
+ Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224112"));
+ Assert.assertTrue("Queue name should be mentioned on logs",
AssertionLoggerHandler.findText(QUEUE_NAME));
+ PostOfficeTestAccessor.reapAddresses((PostOfficeImpl)
server.getPostOffice());
+ Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224113")); // we
need another sweep to remove it
+ }
+
+ @Test
+ public void testSweepAddress() throws Exception {
+ AssertionLoggerHandler.startCapture();
+ server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling
scanner, we will perform it manually
+ AddressSettings settings = new
AddressSettings().setAutoDeleteQueues(true).setAutoDeleteAddresses(true).setAutoDeleteAddressesDelay(10).setAutoDeleteQueuesDelay(10);
+ server.getConfiguration().getAddressesSettings().clear();
+ server.getConfiguration().getAddressesSettings().put("#", settings);
+ server.start();
+ String ADDRESS_NAME = "autocreatedmulticast";
+
+ AddressInfo info = new
AddressInfo(ADDRESS_NAME).addRoutingType(RoutingType.MULTICAST).setAutoCreated(true);
+ server.getPostOffice().addAddressInfo(info);
+ info =
server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(ADDRESS_NAME));
+
+ ConnectionFactory cf = CFUtil.createConnectionFactory("core",
"tcp://localhost:61616");
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic(ADDRESS_NAME);
+ session.createConsumer(topic);
+ }
+
+ { // just a namespace area
+ final AddressInfo infoRef = info;
+ Wait.assertTrue(() -> infoRef.getBindingRemovedTimestamp() != -1);
+ }
+
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
+ Thread.sleep(50);
+ PostOfficeTestAccessor.reapAddresses((PostOfficeImpl)
server.getPostOffice());
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
+ Assert.assertTrue(info.isSwept());
+ PostOfficeTestAccessor.reapAddresses((PostOfficeImpl)
server.getPostOffice());
+ Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224113"));
+ }
+
+
+ @Test
+ public void testNegativeSweepAddress() throws Exception {
+ AssertionLoggerHandler.startCapture();
+ server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling
scanner, we will perform it manually
+ AddressSettings settings = new
AddressSettings().setAutoDeleteQueues(true).setAutoDeleteAddresses(true).setAutoDeleteAddressesDelay(10).setAutoDeleteQueuesDelay(10);
+ server.getConfiguration().getAddressesSettings().clear();
+ server.getConfiguration().getAddressesSettings().put("#", settings);
+ server.start();
+ String ADDRESS_NAME = "autocreatedmulticast";
+
+ AddressInfo info = new
AddressInfo(ADDRESS_NAME).addRoutingType(RoutingType.MULTICAST).setAutoCreated(true);
+ server.getPostOffice().addAddressInfo(info);
+ info =
server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(ADDRESS_NAME));
+
+ ConnectionFactory cf = CFUtil.createConnectionFactory("core",
"tcp://localhost:61616");
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic(ADDRESS_NAME);
+ session.createConsumer(topic);
+ }
+
+ { // just a namespace area
+ final AddressInfo infoRef = info;
+ Wait.assertTrue(() -> infoRef.getBindingRemovedTimestamp() != -1);
+ }
+
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
+ Thread.sleep(50);
+ PostOfficeTestAccessor.reapAddresses((PostOfficeImpl)
server.getPostOffice());
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
+ Assert.assertTrue(info.isSwept());
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic(ADDRESS_NAME);
+ session.createConsumer(topic);
+ PostOfficeTestAccessor.reapAddresses((PostOfficeImpl)
server.getPostOffice());
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
+ Assert.assertFalse(info.isSwept()); // it should be cleared because
there is a consumer now
Review comment:
You first did the reap before adding the consumer, which means it will
be set 'swept' then. Then the next reap is done after the consumer is added and
still present, so it will then be set 'not swept' then. I'm questioning if
adding the consumer should itself make it not-swept immediately, i.e before the
reaper run. It seemed like it would still be marked 'swept' even though the
consumer is present, and that it would only be marked 'not swept' if the second
reap occurs while the consumer is still present, meaning if the consumer goes
away first it will be treated largely the same as if the consumer never existed
(delete-delay influence seeming the only difference)
E.g consider a broker with the default 30sec reap config, meaning two unused
queues are seen and marked 'swept' at the first reap. These would be be marked
'swept' initially at the same time, and so seems like they could later removed
be at the same time even if one remained unused for those next 30s, while the
other actually got used [any number of times] much more recently by a new
consumer(s) that had been created but then also gone away again inbetween the 2
reaps.
As I say, I can see arguments both ways on this. If it is unused at the time
of the reap, and has still passed any delete-delay period (if it has one), then
removing it seems fair enough as it satifies being unused at the time. However,
it still seems a little inconsistent (as one remained unused, while one was
used between the sweeps, meaning its being removed much earlier in its unused
state than the other queue) and given that, I wonder what the point is of the
reaper ever needing to see it 'unused' twice before actually removing it? (I'm
guessing it was so as to not quickly remove unused queues with a 0 value for
delete-delay, which still looks to be the default).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 659561)
Time Spent: 3h 10m (was: 3h)
> Auto delete of a queue could lead to inconsistencies
> ----------------------------------------------------
>
> Key: ARTEMIS-3502
> URL: https://issues.apache.org/jira/browse/ARTEMIS-3502
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Reporter: Clebert Suconic
> Assignee: Clebert Suconic
> Priority: Major
> Time Spent: 3h 10m
> Remaining Estimate: 0h
>
> Auto delete might happen after a disconnect of consumers.
> on a situation where the consumer was up to date the queue could be removed,
> leading to a few issues.
> we should improve how we handle with auto-delete and avoid these
> inconsistencies.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)