[ 
https://issues.apache.org/jira/browse/ARTEMIS-3502?focusedWorklogId=659561&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-659561
 ]

ASF GitHub Bot logged work on ARTEMIS-3502:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Oct/21 13:29
            Start Date: 04/Oct/21 13:29
    Worklog Time Spent: 10m 
      Work Description: gemmellr commented on a change in pull request #3777:
URL: https://github.com/apache/activemq-artemis/pull/3777#discussion_r721365734



##########
File path: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateTest.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeTestAccessor;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.jboss.logging.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AutoCreateTest extends ActiveMQTestBase {
+   private static final Logger logger = Logger.getLogger(AutoCreateTest.class);
+
+   public final SimpleString addressA = new SimpleString("addressA");
+   public final SimpleString queueA = new SimpleString("queueA");
+
+   private ActiveMQServer server;
+
+   @After
+   public void clearLogg() {
+      AssertionLoggerHandler.stopCapture();
+   }
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      server = createServer(true, true);
+      server.getConfiguration().setAddressQueueScanPeriod(10000);
+      AddressSettings settings = new 
AddressSettings().setAutoCreateAddresses(true).setAutoDeleteAddresses(true).setAutoCreateQueues(true).setAutoDeleteQueues(true);
+
+      server.getConfiguration().getAddressesSettings().clear();
+      server.getConfiguration().getAddressesSettings().put("#", settings);
+   }
+
+   @Test
+   public void testAutoCreateDeleteRecreate() throws Exception {
+      server.start();
+      String QUEUE_NAME = "autoCreateAndRecreate";
+
+      AtomicInteger errors = new AtomicInteger(0);
+
+      int THREADS = 10;
+      for (int i = 0; i < 200; i++) {
+         ConnectionFactory cf = CFUtil.createConnectionFactory("core", 
"tcp://localhost:61616");
+         
logger.debug("*******************************************************************************************************************************");
+         logger.debug("run " + i);
+         CyclicBarrier barrier = new CyclicBarrier(THREADS + 1);
+         CountDownLatch done = new CountDownLatch(THREADS);
+         Runnable consumerThread = () -> {
+            try (Connection connection = cf.createConnection()) {
+               Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+               barrier.await(10, TimeUnit.SECONDS);
+               Queue queue = session.createQueue(QUEUE_NAME);
+               MessageConsumer consumer = session.createConsumer(queue);
+               connection.start();
+            } catch (Throwable e) {
+               e.printStackTrace(System.out);
+               errors.incrementAndGet();
+            } finally {
+               done.countDown();
+            }
+         };
+
+         Thread[] threads = new Thread[THREADS];
+         for (int j = 0; j < threads.length; j++) {
+            threads[j] = new Thread(consumerThread);
+            threads[j].start();
+         }
+
+         barrier.await(10, TimeUnit.SECONDS);
+         Assert.assertTrue(done.await(10, TimeUnit.SECONDS));
+         Assert.assertEquals(0, errors.get());
+
+         try (Connection connection = cf.createConnection()) {
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue(QUEUE_NAME);
+            MessageConsumer consumer = session.createConsumer(queue);
+            connection.start();
+
+            MessageProducer producer = session.createProducer(queue);
+            producer.send(session.createTextMessage("hello"));
+
+            Assert.assertNotNull(consumer.receive(5000));
+         }
+
+      }
+   }
+
+   @Test
+   public void testSweep() throws Exception {
+
+      AssertionLoggerHandler.startCapture();
+      server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling 
scanner, we will perform it manually
+      server.start();
+      String QUEUE_NAME = "autoCreateAndRecreate";
+
+      ConnectionFactory cf = CFUtil.createConnectionFactory("core", 
"tcp://localhost:61616");
+      try (Connection connection = cf.createConnection()) {
+         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Queue queue = session.createQueue(QUEUE_NAME);
+         MessageConsumer consumer = session.createConsumer(queue);
+         connection.start();
+      }
+
+      AddressInfo info = 
server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(QUEUE_NAME));
+      Assert.assertNotNull(info);
+      Assert.assertTrue(info.isAutoCreated());
+
+      PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) 
server.getPostOffice());
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224112"));
+      PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) 
server.getPostOffice());
+      Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224112"));
+      Assert.assertTrue("Queue name should be mentioned on logs", 
AssertionLoggerHandler.findText(QUEUE_NAME));
+      PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) 
server.getPostOffice());
+      Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224113")); // we 
need another sweep to remove it
+   }
+
+   @Test
+   public void testSweepAddress() throws Exception {
+      AssertionLoggerHandler.startCapture();
+      server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling 
scanner, we will perform it manually
+      AddressSettings settings = new 
AddressSettings().setAutoDeleteQueues(true).setAutoDeleteAddresses(true).setAutoDeleteAddressesDelay(10).setAutoDeleteQueuesDelay(10);
+      server.getConfiguration().getAddressesSettings().clear();
+      server.getConfiguration().getAddressesSettings().put("#", settings);
+      server.start();
+      String ADDRESS_NAME = "autocreatedmulticast";
+
+      AddressInfo info = new 
AddressInfo(ADDRESS_NAME).addRoutingType(RoutingType.MULTICAST).setAutoCreated(true);
+      server.getPostOffice().addAddressInfo(info);
+      info = 
server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(ADDRESS_NAME));
+
+      ConnectionFactory cf = CFUtil.createConnectionFactory("core", 
"tcp://localhost:61616");
+      try (Connection connection = cf.createConnection()) {
+         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Topic topic = session.createTopic(ADDRESS_NAME);
+         session.createConsumer(topic);
+      }
+
+      { // just a namespace area
+         final AddressInfo infoRef = info;
+         Wait.assertTrue(() -> infoRef.getBindingRemovedTimestamp() != -1);
+      }
+
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
+      Thread.sleep(50);
+      PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) 
server.getPostOffice());
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
+      Assert.assertTrue(info.isSwept());
+      PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) 
server.getPostOffice());
+      Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224113"));
+   }
+
+
+   @Test
+   public void testNegativeSweepAddress() throws Exception {
+      AssertionLoggerHandler.startCapture();
+      server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling 
scanner, we will perform it manually
+      AddressSettings settings = new 
AddressSettings().setAutoDeleteQueues(true).setAutoDeleteAddresses(true).setAutoDeleteAddressesDelay(10).setAutoDeleteQueuesDelay(10);
+      server.getConfiguration().getAddressesSettings().clear();
+      server.getConfiguration().getAddressesSettings().put("#", settings);
+      server.start();
+      String ADDRESS_NAME = "autocreatedmulticast";
+
+      AddressInfo info = new 
AddressInfo(ADDRESS_NAME).addRoutingType(RoutingType.MULTICAST).setAutoCreated(true);
+      server.getPostOffice().addAddressInfo(info);
+      info = 
server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(ADDRESS_NAME));
+
+      ConnectionFactory cf = CFUtil.createConnectionFactory("core", 
"tcp://localhost:61616");
+      try (Connection connection = cf.createConnection()) {
+         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Topic topic = session.createTopic(ADDRESS_NAME);
+         session.createConsumer(topic);
+      }
+
+      { // just a namespace area
+         final AddressInfo infoRef = info;
+         Wait.assertTrue(() -> infoRef.getBindingRemovedTimestamp() != -1);
+      }
+
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
+      Thread.sleep(50);
+      PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) 
server.getPostOffice());
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
+      Assert.assertTrue(info.isSwept());
+      try (Connection connection = cf.createConnection()) {
+         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Topic topic = session.createTopic(ADDRESS_NAME);
+         session.createConsumer(topic);
+         PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) 
server.getPostOffice());
+         Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
+         Assert.assertFalse(info.isSwept()); // it should be cleared because 
there is a consumer now

Review comment:
       You first did the reap before adding the consumer, which means it will 
be set 'swept' then. Then the next reap is done after the consumer is added and 
still present, so it will then be set 'not swept' then. I'm questioning if 
adding the consumer should itself make it not-swept immediately, i.e before the 
reaper run. It seemed like it would still be marked 'swept' even though the 
consumer is present, and that it would only be marked 'not swept' if the second 
reap occurs while the consumer is still present, meaning if the consumer goes 
away first it will be treated largely the same as if the consumer never existed 
(delete-delay influence seeming the only difference)
   
   E.g consider a broker with the default 30sec reap config, meaning two unused 
queues are seen and marked 'swept' at the first reap. These would be be marked 
'swept' initially at the same time, and so seems like they could later removed 
be at the same time even if one remained unused for those next 30s, while the 
other actually got used [any number of times] much more recently by a new 
consumer(s) that had been created but then also gone away again inbetween the 2 
reaps.
   
   As I say, I can see arguments both ways on this. If it is unused at the time 
of the reap, and has still passed any delete-delay period (if it has one), then 
removing it seems fair enough as it satifies being unused at the time. However, 
it still seems a little inconsistent (as one remained unused, while one was 
used between the sweeps, meaning its being removed much earlier in its unused 
state than the other queue) and given that, I wonder what the point is of the 
reaper ever needing to see it 'unused' twice before actually removing it? (I'm 
guessing it was so as to not quickly remove unused queues with a 0 value for 
delete-delay, which still looks to be the default).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 659561)
    Time Spent: 3h 10m  (was: 3h)

> Auto delete of a queue could lead to inconsistencies
> ----------------------------------------------------
>
>                 Key: ARTEMIS-3502
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-3502
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>            Reporter: Clebert Suconic
>            Assignee: Clebert Suconic
>            Priority: Major
>          Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> Auto delete might happen after a disconnect of consumers.
> on a situation where the consumer was up to date the queue could be removed, 
> leading to a few issues.
> we should improve how we handle with auto-delete and avoid these 
> inconsistencies.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to