[
https://issues.apache.org/jira/browse/ARTEMIS-3502?focusedWorklogId=658876&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-658876
]
ASF GitHub Bot logged work on ARTEMIS-3502:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 01/Oct/21 11:04
Start Date: 01/Oct/21 11:04
Worklog Time Spent: 10m
Work Description: gemmellr commented on a change in pull request #3777:
URL: https://github.com/apache/activemq-artemis/pull/3777#discussion_r720112248
##########
File path:
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
##########
@@ -75,6 +76,14 @@
private StorageManager storageManager;
private HierarchicalRepositoryChangeListener repositoryChangeListener;
+ public boolean isSwept() {
+ return swept;
+ }
+
+ public void setSweep(boolean sweep) {
+ this.swept = sweep;
+ }
+
Review comment:
Given the way these are used I think having a single naming matchup
would be nicer, i.e isSwept + setSwept, property style.
Having setSweep here instead seems as much like it would be a sort of
instruction on whether to include it in a sweep or not. It doesnt as obviously
follow that 'setting sweep' then immediately means it isSwept() or not, which
is how it is currently used.
##########
File path:
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
##########
@@ -353,6 +355,15 @@ protected void repeatNextDelivery(MessageReference
reference) {
}
+ @Override
+ public boolean isSwept() {
+ return swept;
+ }
+
+ @Override
+ public void setSweep(boolean sweep) {
+ this.swept = sweep;
+ }
Review comment:
Same here
##########
File path:
artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/SingleServerTestBase.java
##########
@@ -42,6 +42,7 @@ public void setUp() throws Exception {
super.setUp();
server = createServer();
+ server.getConfiguration().setAddressQueueScanPeriod(100);
Review comment:
This might be ok initially, but I think it would be much more
appropriate long term for specific tests that are verifying the related
behaviours (and thus wanting rapid service) to set such config themselves,
rather than having it set low generally for all tests all the time, and thus
have all tests using non-representative configuration even though they aren't
interested in that area.
That way they can also probably set it less than 100ms, which is still quite
a long time for any non-trivial number of tests relying on the behaviour.
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateTest.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeTestAccessor;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.jboss.logging.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AutoCreateTest extends ActiveMQTestBase {
+ private static final Logger logger = Logger.getLogger(AutoCreateTest.class);
+
+ public final SimpleString addressA = new SimpleString("addressA");
+ public final SimpleString queueA = new SimpleString("queueA");
+
+ private ActiveMQServer server;
+
+ @After
+ public void clearLogg() {
+ AssertionLoggerHandler.stopCapture();
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ server = createServer(true, true);
+ server.getConfiguration().setAddressQueueScanPeriod(10000);
Review comment:
Following prior comment about tests using non typical setups...
I wouldnt expect a test class such as this to need to set this value, when
the default is know to be 30 seconds. It has to because its been set generally
low and non-representative in the base class. Having the test verify the value
in use is suitable would be entirely reasonable, but having to actually change
it so it is as in this case, seems off.
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateTest.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeTestAccessor;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.jboss.logging.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AutoCreateTest extends ActiveMQTestBase {
+ private static final Logger logger = Logger.getLogger(AutoCreateTest.class);
+
+ public final SimpleString addressA = new SimpleString("addressA");
+ public final SimpleString queueA = new SimpleString("queueA");
+
+ private ActiveMQServer server;
+
+ @After
+ public void clearLogg() {
+ AssertionLoggerHandler.stopCapture();
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ server = createServer(true, true);
+ server.getConfiguration().setAddressQueueScanPeriod(10000);
+ AddressSettings settings = new
AddressSettings().setAutoCreateAddresses(true).setAutoDeleteAddresses(true).setAutoCreateQueues(true).setAutoDeleteQueues(true);
+
+ server.getConfiguration().getAddressesSettings().clear();
+ server.getConfiguration().getAddressesSettings().put("#", settings);
+ }
+
+ @Test
+ public void testAutoCreateDeleteRecreate() throws Exception {
+ server.start();
+ String QUEUE_NAME = "autoCreateAndRecreate";
+
+ AtomicInteger errors = new AtomicInteger(0);
+
+ int THREADS = 10;
+ for (int i = 0; i < 200; i++) {
+ ConnectionFactory cf = CFUtil.createConnectionFactory("core",
"tcp://localhost:61616");
+
logger.debug("*******************************************************************************************************************************");
+ logger.debug("run " + i);
+ CyclicBarrier barrier = new CyclicBarrier(THREADS + 1);
+ CountDownLatch done = new CountDownLatch(THREADS);
+ Runnable consumerThread = () -> {
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ barrier.await(10, TimeUnit.SECONDS);
+ Queue queue = session.createQueue(QUEUE_NAME);
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+ } catch (Throwable e) {
+ e.printStackTrace(System.out);
+ errors.incrementAndGet();
+ } finally {
+ done.countDown();
+ }
+ };
+
+ Thread[] threads = new Thread[THREADS];
+ for (int j = 0; j < threads.length; j++) {
+ threads[j] = new Thread(consumerThread);
+ threads[j].start();
+ }
Review comment:
Would reusing a single 10-thread executor and just submitting the lambda
repeatedly be nicer than needing the arrays and creating 2000 threads overall?
(Aside, does it really need 200 iterations ?)
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateTest.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeTestAccessor;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.jboss.logging.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AutoCreateTest extends ActiveMQTestBase {
+ private static final Logger logger = Logger.getLogger(AutoCreateTest.class);
+
+ public final SimpleString addressA = new SimpleString("addressA");
+ public final SimpleString queueA = new SimpleString("queueA");
+
+ private ActiveMQServer server;
+
+ @After
+ public void clearLogg() {
+ AssertionLoggerHandler.stopCapture();
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ server = createServer(true, true);
+ server.getConfiguration().setAddressQueueScanPeriod(10000);
+ AddressSettings settings = new
AddressSettings().setAutoCreateAddresses(true).setAutoDeleteAddresses(true).setAutoCreateQueues(true).setAutoDeleteQueues(true);
+
+ server.getConfiguration().getAddressesSettings().clear();
+ server.getConfiguration().getAddressesSettings().put("#", settings);
+ }
+
+ @Test
+ public void testAutoCreateDeleteRecreate() throws Exception {
+ server.start();
+ String QUEUE_NAME = "autoCreateAndRecreate";
+
+ AtomicInteger errors = new AtomicInteger(0);
+
+ int THREADS = 10;
+ for (int i = 0; i < 200; i++) {
+ ConnectionFactory cf = CFUtil.createConnectionFactory("core",
"tcp://localhost:61616");
+
logger.debug("*******************************************************************************************************************************");
+ logger.debug("run " + i);
+ CyclicBarrier barrier = new CyclicBarrier(THREADS + 1);
+ CountDownLatch done = new CountDownLatch(THREADS);
+ Runnable consumerThread = () -> {
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ barrier.await(10, TimeUnit.SECONDS);
+ Queue queue = session.createQueue(QUEUE_NAME);
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+ } catch (Throwable e) {
+ e.printStackTrace(System.out);
+ errors.incrementAndGet();
+ } finally {
+ done.countDown();
+ }
+ };
+
+ Thread[] threads = new Thread[THREADS];
+ for (int j = 0; j < threads.length; j++) {
+ threads[j] = new Thread(consumerThread);
+ threads[j].start();
+ }
+
+ barrier.await(10, TimeUnit.SECONDS);
+ Assert.assertTrue(done.await(10, TimeUnit.SECONDS));
+ Assert.assertEquals(0, errors.get());
+
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(QUEUE_NAME);
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("hello"));
+
+ Assert.assertNotNull(consumer.receive(5000));
+ }
+
+ }
+ }
+
+ @Test
+ public void testSweep() throws Exception {
+
+ AssertionLoggerHandler.startCapture();
+ server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling
scanner, we will perform it manually
+ server.start();
+ String QUEUE_NAME = "autoCreateAndRecreate";
+
+ ConnectionFactory cf = CFUtil.createConnectionFactory("core",
"tcp://localhost:61616");
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(QUEUE_NAME);
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+ }
+
+ AddressInfo info =
server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(QUEUE_NAME));
+ Assert.assertNotNull(info);
+ Assert.assertTrue(info.isAutoCreated());
+
+ PostOfficeTestAccessor.reapAddresses((PostOfficeImpl)
server.getPostOffice());
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224112"));
+ PostOfficeTestAccessor.reapAddresses((PostOfficeImpl)
server.getPostOffice());
+ Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224112"));
+ Assert.assertTrue("Queue name should be mentioned on logs",
AssertionLoggerHandler.findText(QUEUE_NAME));
+ PostOfficeTestAccessor.reapAddresses((PostOfficeImpl)
server.getPostOffice());
+ Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224113")); // we
need another sweep to remove it
+ }
+
+ @Test
+ public void testSweepAddress() throws Exception {
+ AssertionLoggerHandler.startCapture();
+ server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling
scanner, we will perform it manually
+ AddressSettings settings = new
AddressSettings().setAutoDeleteQueues(true).setAutoDeleteAddresses(true).setAutoDeleteAddressesDelay(10).setAutoDeleteQueuesDelay(10);
+ server.getConfiguration().getAddressesSettings().clear();
+ server.getConfiguration().getAddressesSettings().put("#", settings);
+ server.start();
+ String ADDRESS_NAME = "autocreatedmulticast";
Review comment:
As earlier comment.
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateTest.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeTestAccessor;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.jboss.logging.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AutoCreateTest extends ActiveMQTestBase {
+ private static final Logger logger = Logger.getLogger(AutoCreateTest.class);
+
+ public final SimpleString addressA = new SimpleString("addressA");
+ public final SimpleString queueA = new SimpleString("queueA");
+
+ private ActiveMQServer server;
+
+ @After
+ public void clearLogg() {
+ AssertionLoggerHandler.stopCapture();
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ server = createServer(true, true);
+ server.getConfiguration().setAddressQueueScanPeriod(10000);
+ AddressSettings settings = new
AddressSettings().setAutoCreateAddresses(true).setAutoDeleteAddresses(true).setAutoCreateQueues(true).setAutoDeleteQueues(true);
+
+ server.getConfiguration().getAddressesSettings().clear();
+ server.getConfiguration().getAddressesSettings().put("#", settings);
+ }
+
+ @Test
+ public void testAutoCreateDeleteRecreate() throws Exception {
+ server.start();
+ String QUEUE_NAME = "autoCreateAndRecreate";
+
+ AtomicInteger errors = new AtomicInteger(0);
+
+ int THREADS = 10;
+ for (int i = 0; i < 200; i++) {
+ ConnectionFactory cf = CFUtil.createConnectionFactory("core",
"tcp://localhost:61616");
+
logger.debug("*******************************************************************************************************************************");
+ logger.debug("run " + i);
+ CyclicBarrier barrier = new CyclicBarrier(THREADS + 1);
+ CountDownLatch done = new CountDownLatch(THREADS);
+ Runnable consumerThread = () -> {
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ barrier.await(10, TimeUnit.SECONDS);
+ Queue queue = session.createQueue(QUEUE_NAME);
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+ } catch (Throwable e) {
+ e.printStackTrace(System.out);
+ errors.incrementAndGet();
+ } finally {
+ done.countDown();
+ }
+ };
+
+ Thread[] threads = new Thread[THREADS];
+ for (int j = 0; j < threads.length; j++) {
+ threads[j] = new Thread(consumerThread);
+ threads[j].start();
+ }
+
+ barrier.await(10, TimeUnit.SECONDS);
+ Assert.assertTrue(done.await(10, TimeUnit.SECONDS));
+ Assert.assertEquals(0, errors.get());
+
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(QUEUE_NAME);
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("hello"));
+
+ Assert.assertNotNull(consumer.receive(5000));
+ }
+
+ }
+ }
+
+ @Test
+ public void testSweep() throws Exception {
+
+ AssertionLoggerHandler.startCapture();
+ server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling
scanner, we will perform it manually
+ server.start();
+ String QUEUE_NAME = "autoCreateAndRecreate";
Review comment:
As earlier comment.
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateTest.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeTestAccessor;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.jboss.logging.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AutoCreateTest extends ActiveMQTestBase {
+ private static final Logger logger = Logger.getLogger(AutoCreateTest.class);
+
+ public final SimpleString addressA = new SimpleString("addressA");
+ public final SimpleString queueA = new SimpleString("queueA");
+
+ private ActiveMQServer server;
+
+ @After
+ public void clearLogg() {
+ AssertionLoggerHandler.stopCapture();
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ server = createServer(true, true);
+ server.getConfiguration().setAddressQueueScanPeriod(10000);
+ AddressSettings settings = new
AddressSettings().setAutoCreateAddresses(true).setAutoDeleteAddresses(true).setAutoCreateQueues(true).setAutoDeleteQueues(true);
+
+ server.getConfiguration().getAddressesSettings().clear();
+ server.getConfiguration().getAddressesSettings().put("#", settings);
+ }
+
+ @Test
+ public void testAutoCreateDeleteRecreate() throws Exception {
+ server.start();
+ String QUEUE_NAME = "autoCreateAndRecreate";
+
+ AtomicInteger errors = new AtomicInteger(0);
+
+ int THREADS = 10;
+ for (int i = 0; i < 200; i++) {
+ ConnectionFactory cf = CFUtil.createConnectionFactory("core",
"tcp://localhost:61616");
+
logger.debug("*******************************************************************************************************************************");
+ logger.debug("run " + i);
+ CyclicBarrier barrier = new CyclicBarrier(THREADS + 1);
+ CountDownLatch done = new CountDownLatch(THREADS);
+ Runnable consumerThread = () -> {
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ barrier.await(10, TimeUnit.SECONDS);
+ Queue queue = session.createQueue(QUEUE_NAME);
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+ } catch (Throwable e) {
+ e.printStackTrace(System.out);
+ errors.incrementAndGet();
+ } finally {
+ done.countDown();
+ }
+ };
+
+ Thread[] threads = new Thread[THREADS];
+ for (int j = 0; j < threads.length; j++) {
+ threads[j] = new Thread(consumerThread);
+ threads[j].start();
+ }
+
+ barrier.await(10, TimeUnit.SECONDS);
+ Assert.assertTrue(done.await(10, TimeUnit.SECONDS));
+ Assert.assertEquals(0, errors.get());
+
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(QUEUE_NAME);
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("hello"));
+
+ Assert.assertNotNull(consumer.receive(5000));
+ }
+
+ }
+ }
+
+ @Test
+ public void testSweep() throws Exception {
+
+ AssertionLoggerHandler.startCapture();
+ server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling
scanner, we will perform it manually
+ server.start();
+ String QUEUE_NAME = "autoCreateAndRecreate";
+
+ ConnectionFactory cf = CFUtil.createConnectionFactory("core",
"tcp://localhost:61616");
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(QUEUE_NAME);
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+ }
+
+ AddressInfo info =
server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(QUEUE_NAME));
+ Assert.assertNotNull(info);
+ Assert.assertTrue(info.isAutoCreated());
+
+ PostOfficeTestAccessor.reapAddresses((PostOfficeImpl)
server.getPostOffice());
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224112"));
+ PostOfficeTestAccessor.reapAddresses((PostOfficeImpl)
server.getPostOffice());
+ Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224112"));
+ Assert.assertTrue("Queue name should be mentioned on logs",
AssertionLoggerHandler.findText(QUEUE_NAME));
+ PostOfficeTestAccessor.reapAddresses((PostOfficeImpl)
server.getPostOffice());
+ Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224113")); // we
need another sweep to remove it
+ }
+
+ @Test
+ public void testSweepAddress() throws Exception {
+ AssertionLoggerHandler.startCapture();
+ server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling
scanner, we will perform it manually
+ AddressSettings settings = new
AddressSettings().setAutoDeleteQueues(true).setAutoDeleteAddresses(true).setAutoDeleteAddressesDelay(10).setAutoDeleteQueuesDelay(10);
+ server.getConfiguration().getAddressesSettings().clear();
+ server.getConfiguration().getAddressesSettings().put("#", settings);
+ server.start();
+ String ADDRESS_NAME = "autocreatedmulticast";
+
+ AddressInfo info = new
AddressInfo(ADDRESS_NAME).addRoutingType(RoutingType.MULTICAST).setAutoCreated(true);
+ server.getPostOffice().addAddressInfo(info);
+ info =
server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(ADDRESS_NAME));
+
+ ConnectionFactory cf = CFUtil.createConnectionFactory("core",
"tcp://localhost:61616");
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic(ADDRESS_NAME);
+ session.createConsumer(topic);
+ }
+
+ { // just a namespace area
+ final AddressInfo infoRef = info;
+ Wait.assertTrue(() -> infoRef.getBindingRemovedTimestamp() != -1);
+ }
+
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
+ Thread.sleep(50);
+ PostOfficeTestAccessor.reapAddresses((PostOfficeImpl)
server.getPostOffice());
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
+ Assert.assertTrue(info.isSwept());
Review comment:
Would be nice to assert it is false before this.
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateTest.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeTestAccessor;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.jboss.logging.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AutoCreateTest extends ActiveMQTestBase {
+ private static final Logger logger = Logger.getLogger(AutoCreateTest.class);
+
+ public final SimpleString addressA = new SimpleString("addressA");
+ public final SimpleString queueA = new SimpleString("queueA");
+
+ private ActiveMQServer server;
+
+ @After
+ public void clearLogg() {
+ AssertionLoggerHandler.stopCapture();
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ server = createServer(true, true);
+ server.getConfiguration().setAddressQueueScanPeriod(10000);
+ AddressSettings settings = new
AddressSettings().setAutoCreateAddresses(true).setAutoDeleteAddresses(true).setAutoCreateQueues(true).setAutoDeleteQueues(true);
+
+ server.getConfiguration().getAddressesSettings().clear();
+ server.getConfiguration().getAddressesSettings().put("#", settings);
+ }
+
+ @Test
+ public void testAutoCreateDeleteRecreate() throws Exception {
+ server.start();
+ String QUEUE_NAME = "autoCreateAndRecreate";
+
+ AtomicInteger errors = new AtomicInteger(0);
+
+ int THREADS = 10;
+ for (int i = 0; i < 200; i++) {
+ ConnectionFactory cf = CFUtil.createConnectionFactory("core",
"tcp://localhost:61616");
+
logger.debug("*******************************************************************************************************************************");
+ logger.debug("run " + i);
+ CyclicBarrier barrier = new CyclicBarrier(THREADS + 1);
+ CountDownLatch done = new CountDownLatch(THREADS);
+ Runnable consumerThread = () -> {
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ barrier.await(10, TimeUnit.SECONDS);
+ Queue queue = session.createQueue(QUEUE_NAME);
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+ } catch (Throwable e) {
+ e.printStackTrace(System.out);
+ errors.incrementAndGet();
+ } finally {
+ done.countDown();
+ }
+ };
+
+ Thread[] threads = new Thread[THREADS];
+ for (int j = 0; j < threads.length; j++) {
+ threads[j] = new Thread(consumerThread);
+ threads[j].start();
+ }
+
+ barrier.await(10, TimeUnit.SECONDS);
+ Assert.assertTrue(done.await(10, TimeUnit.SECONDS));
+ Assert.assertEquals(0, errors.get());
+
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(QUEUE_NAME);
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("hello"));
+
+ Assert.assertNotNull(consumer.receive(5000));
+ }
+
+ }
+ }
+
+ @Test
+ public void testSweep() throws Exception {
+
+ AssertionLoggerHandler.startCapture();
+ server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling
scanner, we will perform it manually
+ server.start();
+ String QUEUE_NAME = "autoCreateAndRecreate";
+
+ ConnectionFactory cf = CFUtil.createConnectionFactory("core",
"tcp://localhost:61616");
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(QUEUE_NAME);
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+ }
+
+ AddressInfo info =
server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(QUEUE_NAME));
+ Assert.assertNotNull(info);
+ Assert.assertTrue(info.isAutoCreated());
+
+ PostOfficeTestAccessor.reapAddresses((PostOfficeImpl)
server.getPostOffice());
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224112"));
+ PostOfficeTestAccessor.reapAddresses((PostOfficeImpl)
server.getPostOffice());
+ Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224112"));
+ Assert.assertTrue("Queue name should be mentioned on logs",
AssertionLoggerHandler.findText(QUEUE_NAME));
+ PostOfficeTestAccessor.reapAddresses((PostOfficeImpl)
server.getPostOffice());
+ Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224113")); // we
need another sweep to remove it
+ }
+
+ @Test
+ public void testSweepAddress() throws Exception {
+ AssertionLoggerHandler.startCapture();
+ server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling
scanner, we will perform it manually
+ AddressSettings settings = new
AddressSettings().setAutoDeleteQueues(true).setAutoDeleteAddresses(true).setAutoDeleteAddressesDelay(10).setAutoDeleteQueuesDelay(10);
+ server.getConfiguration().getAddressesSettings().clear();
+ server.getConfiguration().getAddressesSettings().put("#", settings);
+ server.start();
+ String ADDRESS_NAME = "autocreatedmulticast";
+
+ AddressInfo info = new
AddressInfo(ADDRESS_NAME).addRoutingType(RoutingType.MULTICAST).setAutoCreated(true);
+ server.getPostOffice().addAddressInfo(info);
+ info =
server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(ADDRESS_NAME));
+
+ ConnectionFactory cf = CFUtil.createConnectionFactory("core",
"tcp://localhost:61616");
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic(ADDRESS_NAME);
+ session.createConsumer(topic);
+ }
+
+ { // just a namespace area
+ final AddressInfo infoRef = info;
+ Wait.assertTrue(() -> infoRef.getBindingRemovedTimestamp() != -1);
+ }
+
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
+ Thread.sleep(50);
+ PostOfficeTestAccessor.reapAddresses((PostOfficeImpl)
server.getPostOffice());
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
+ Assert.assertTrue(info.isSwept());
+ PostOfficeTestAccessor.reapAddresses((PostOfficeImpl)
server.getPostOffice());
+ Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224113"));
+ }
+
+
+ @Test
+ public void testNegativeSweepAddress() throws Exception {
+ AssertionLoggerHandler.startCapture();
+ server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling
scanner, we will perform it manually
+ AddressSettings settings = new
AddressSettings().setAutoDeleteQueues(true).setAutoDeleteAddresses(true).setAutoDeleteAddressesDelay(10).setAutoDeleteQueuesDelay(10);
+ server.getConfiguration().getAddressesSettings().clear();
+ server.getConfiguration().getAddressesSettings().put("#", settings);
+ server.start();
+ String ADDRESS_NAME = "autocreatedmulticast";
+
+ AddressInfo info = new
AddressInfo(ADDRESS_NAME).addRoutingType(RoutingType.MULTICAST).setAutoCreated(true);
+ server.getPostOffice().addAddressInfo(info);
+ info =
server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(ADDRESS_NAME));
+
+ ConnectionFactory cf = CFUtil.createConnectionFactory("core",
"tcp://localhost:61616");
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic(ADDRESS_NAME);
+ session.createConsumer(topic);
+ }
+
+ { // just a namespace area
+ final AddressInfo infoRef = info;
+ Wait.assertTrue(() -> infoRef.getBindingRemovedTimestamp() != -1);
+ }
+
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
+ Thread.sleep(50);
+ PostOfficeTestAccessor.reapAddresses((PostOfficeImpl)
server.getPostOffice());
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
+ Assert.assertTrue(info.isSwept());
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic(ADDRESS_NAME);
+ session.createConsumer(topic);
+ PostOfficeTestAccessor.reapAddresses((PostOfficeImpl)
server.getPostOffice());
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
+ Assert.assertFalse(info.isSwept()); // it should be cleared because
there is a consumer now
Review comment:
Question, should it be false *before* you run the reaper?
Right now it looks like if consumers come in and leave between an initial
reap where it was set true, and then a second reap later, it would still be
removed as it was initially set swept. Should it be? I could see arguments both
ways.
##########
File path:
artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
##########
@@ -1843,47 +1843,74 @@ public void run() {
@Override
public void run() {
- getLocalQueues().forEach(queue -> {
- if (!queue.isInternalQueue() &&
QueueManagerImpl.isAutoDelete(queue) &&
QueueManagerImpl.consumerCountCheck(queue) &&
QueueManagerImpl.delayCheck(queue) && QueueManagerImpl.messageCountCheck(queue)
&& queueWasUsed(queue)) {
+ reapAddresses();
+ }
+ }
+
+ private boolean queueWasUsed(Queue queue) {
+ return queue.getMessagesExpired() > 0 || queue.getMessagesAcknowledged()
> 0 || queue.getMessagesKilled() > 0 || queue.getConsumerRemovedTimestamp() !=
-1;
+ }
+
+ /** To be used by the AddressQueueReaper */
Review comment:
Would probably remove (or change) the comment since it is also exposed
for and used by tests.
##########
File path:
artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
##########
@@ -1843,47 +1843,74 @@ public void run() {
@Override
public void run() {
- getLocalQueues().forEach(queue -> {
- if (!queue.isInternalQueue() &&
QueueManagerImpl.isAutoDelete(queue) &&
QueueManagerImpl.consumerCountCheck(queue) &&
QueueManagerImpl.delayCheck(queue) && QueueManagerImpl.messageCountCheck(queue)
&& queueWasUsed(queue)) {
+ reapAddresses();
+ }
+ }
+
+ private boolean queueWasUsed(Queue queue) {
Review comment:
Can it be static final?
##########
File path:
artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
##########
@@ -1843,47 +1843,74 @@ public void run() {
@Override
public void run() {
- getLocalQueues().forEach(queue -> {
- if (!queue.isInternalQueue() &&
QueueManagerImpl.isAutoDelete(queue) &&
QueueManagerImpl.consumerCountCheck(queue) &&
QueueManagerImpl.delayCheck(queue) && QueueManagerImpl.messageCountCheck(queue)
&& queueWasUsed(queue)) {
+ reapAddresses();
+ }
+ }
+
+ private boolean queueWasUsed(Queue queue) {
+ return queue.getMessagesExpired() > 0 || queue.getMessagesAcknowledged()
> 0 || queue.getMessagesKilled() > 0 || queue.getConsumerRemovedTimestamp() !=
-1;
+ }
+
+ /** To be used by the AddressQueueReaper */
+ void reapAddresses() {
+ getLocalQueues().forEach(queue -> {
+ if (!queue.isInternalQueue() && QueueManagerImpl.isAutoDelete(queue)
&& QueueManagerImpl.consumerCountCheck(queue) &&
QueueManagerImpl.delayCheck(queue) && QueueManagerImpl.messageCountCheck(queue)
&& queueWasUsed(queue)) {
+ if (queue.isSwept()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Removing queue " + queue.getName() + " after
it being swept twice on reaping process");
+ }
QueueManagerImpl.performAutoDeleteQueue(server, queue);
+ } else {
+ queue.setSweep(true);
}
- });
+ } else {
+ queue.setSweep(false);
+ }
+ });
- Set<SimpleString> addresses = addressManager.getAddresses();
+ Set<SimpleString> addresses = addressManager.getAddresses();
- for (SimpleString address : addresses) {
- AddressInfo addressInfo = getAddressInfo(address);
- AddressSettings settings =
addressSettingsRepository.getMatch(address.toString());
+ for (SimpleString address : addresses) {
+ AddressInfo addressInfo = getAddressInfo(address);
+ AddressSettings settings =
addressSettingsRepository.getMatch(address.toString());
- try {
- if (settings.isAutoDeleteAddresses() && addressInfo != null &&
addressInfo.isAutoCreated() && !isAddressBound(address) &&
addressInfo.getBindingRemovedTimestamp() != -1 && (System.currentTimeMillis() -
addressInfo.getBindingRemovedTimestamp() >=
settings.getAutoDeleteAddressesDelay())) {
+ try {
+ if (addressManager.checkAutoRemoveAddress(address, addressInfo,
settings)) {
+ if (addressInfo.isSwept()) {
- if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
- ActiveMQServerLogger.LOGGER.debug("deleting auto-created
address \"" + address + ".\"");
+ server.autoRemoveAddressInfo(address, null);
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Sweeping address " + address);
}
-
- server.removeAddressInfo(address, null);
+ addressInfo.setSweep(true);
+ }
+ } else {
+ if (addressInfo != null) {
+ addressInfo.setSweep(false);
}
- } catch (ActiveMQShutdownException e) {
+ }
+ } catch (ActiveMQShutdownException e) {
+ // the address and queue reaper is asynchronous so it may happen
+ // that the broker is shutting down while the reaper iterates
+ // through the addresses, next restart this operation will be
retried
+ logger.debug(e.getMessage(), e);
+ } catch (Exception e) {
+ if (e instanceof ActiveMQAddressDoesNotExistException &&
getAddressInfo(address) == null) {
// the address and queue reaper is asynchronous so it may happen
- // that the broker is shutting down while the reaper iterates
- // through the addresses, next restart this operation will be
retried
+ // that the address is removed before the reaper removes it
logger.debug(e.getMessage(), e);
- } catch (Exception e) {
- if (e instanceof ActiveMQAddressDoesNotExistException &&
getAddressInfo(address) == null) {
- // the address and queue reaper is asynchronous so it may
happen
- // that the address is removed before the reaper removes it
- logger.debug(e.getMessage(), e);
- } else {
- ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e,
address);
- }
+ } else {
+ ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e,
address);
Review comment:
Logger is wrong, indicates reporting error removing a queue, when this
is the address reaping section.
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateTest.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeTestAccessor;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.jboss.logging.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AutoCreateTest extends ActiveMQTestBase {
+ private static final Logger logger = Logger.getLogger(AutoCreateTest.class);
+
+ public final SimpleString addressA = new SimpleString("addressA");
+ public final SimpleString queueA = new SimpleString("queueA");
+
+ private ActiveMQServer server;
+
+ @After
+ public void clearLogg() {
+ AssertionLoggerHandler.stopCapture();
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ server = createServer(true, true);
+ server.getConfiguration().setAddressQueueScanPeriod(10000);
+ AddressSettings settings = new
AddressSettings().setAutoCreateAddresses(true).setAutoDeleteAddresses(true).setAutoCreateQueues(true).setAutoDeleteQueues(true);
+
+ server.getConfiguration().getAddressesSettings().clear();
+ server.getConfiguration().getAddressesSettings().put("#", settings);
+ }
+
+ @Test
+ public void testAutoCreateDeleteRecreate() throws Exception {
+ server.start();
+ String QUEUE_NAME = "autoCreateAndRecreate";
Review comment:
Would be nice to use the exact test name. You also dont need to hard
code it, theres a method to get it, e.g something like:
final String QUEUE_NAME = getName();
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 658876)
Time Spent: 40m (was: 0.5h)
> Auto delete of a queue could lead to inconsistencies
> ----------------------------------------------------
>
> Key: ARTEMIS-3502
> URL: https://issues.apache.org/jira/browse/ARTEMIS-3502
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Reporter: Clebert Suconic
> Assignee: Clebert Suconic
> Priority: Major
> Time Spent: 40m
> Remaining Estimate: 0h
>
> Auto delete might happen after a disconnect of consumers.
> on a situation where the consumer was up to date the queue could be removed,
> leading to a few issues.
> we should improve how we handle with auto-delete and avoid these
> inconsistencies.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)