Repository: activemq-artemis Updated Branches: refs/heads/master 35415510d -> 57038ff47
ARTEMIS-922 implement purge semantics Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/17528141 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/17528141 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/17528141 Branch: refs/heads/master Commit: 1752814197549e7e91aeb8fe72488736ec1fb91a Parents: 3541551 Author: Justin Bertram <[email protected]> Authored: Thu Jan 19 10:36:26 2017 -0600 Committer: Justin Bertram <[email protected]> Committed: Tue Jan 31 08:37:05 2017 -0600 ---------------------------------------------------------------------- .../core/server/ActiveMQServerLogger.java | 4 + .../core/server/AutoCreatedQueueManager.java | 25 ------ .../artemis/core/server/QueueManager.java | 25 ++++++ .../core/server/impl/ActiveMQServerImpl.java | 4 +- .../impl/AutoCreatedQueueManagerImpl.java | 77 ------------------ .../server/impl/PostOfficeJournalLoader.java | 2 +- .../artemis/core/server/impl/QueueImpl.java | 8 +- .../core/server/impl/QueueManagerImpl.java | 86 ++++++++++++++++++++ .../integration/addressing/AddressingTest.java | 35 ++++---- .../amqp/ClientDefinedMultiConsumerTest.java | 22 ++++- 10 files changed, 152 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/17528141/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index b52ed24..c365b7d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1558,4 +1558,8 @@ public interface ActiveMQServerLogger extends BasicLogger { @Message(id = 224072, value = "Message Counter Sample Period too short: {0}", format = Message.Format.MESSAGE_FORMAT) void invalidMessageCounterPeriod(long value); + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 224073, value = "Failed to purge queue {0} on no consumers", format = Message.Format.MESSAGE_FORMAT) + void failedToPurgeQueue(@Cause Exception e, SimpleString bindingName); + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/17528141/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/AutoCreatedQueueManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/AutoCreatedQueueManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/AutoCreatedQueueManager.java deleted file mode 100644 index 5af5c0e..0000000 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/AutoCreatedQueueManager.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.server; - -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.utils.ReferenceCounter; - -public interface AutoCreatedQueueManager extends ReferenceCounter { - - SimpleString getQueueName(); -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/17528141/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueManager.java new file mode 100644 index 0000000..a847757 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueManager.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.utils.ReferenceCounter; + +public interface QueueManager extends ReferenceCounter { + + SimpleString getQueueName(); +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/17528141/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 5c392cc..7575100 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -2472,8 +2472,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { if (transientQueue) { queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName())); - } else if (queue.isAutoCreated()) { - queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this, queue.getName())); + } else { + queue.setConsumersRefCount(new QueueManagerImpl(this, queue.getName())); } final QueueBinding localQueueBinding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/17528141/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java deleted file mode 100644 index fd89a94..0000000 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.server.impl; - -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.core.server.AutoCreatedQueueManager; -import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.core.settings.impl.AddressSettings; -import org.apache.activemq.artemis.utils.ReferenceCounterUtil; - -public class AutoCreatedQueueManagerImpl implements AutoCreatedQueueManager { - - private final SimpleString queueName; - - private final ActiveMQServer server; - - private final Runnable runnable = new Runnable() { - @Override - public void run() { - Queue queue = server.locateQueue(queueName); - SimpleString address = queue.getAddress(); - AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString()); - long consumerCount = queue.getConsumerCount(); - long messageCount = queue.getMessageCount(); - - if (((queue.isAutoCreated() && settings.isAutoDeleteQueues()) || queue.isPurgeOnNoConsumers()) && queue.getMessageCount() == 0) { - if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { - ActiveMQServerLogger.LOGGER.debug("deleting " + (queue.isAutoCreated() ? "auto-created " : "") + "queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteQueues = " + settings.isAutoDeleteQueues()); - } - - try { - server.destroyQueue(queueName, null, true, false); - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e, queueName); - } - } - } - }; - - private final ReferenceCounterUtil referenceCounterUtil = new ReferenceCounterUtil(runnable); - - public AutoCreatedQueueManagerImpl(ActiveMQServer server, SimpleString queueName) { - this.server = server; - this.queueName = queueName; - } - - @Override - public int increment() { - return referenceCounterUtil.increment(); - } - - @Override - public int decrement() { - return referenceCounterUtil.decrement(); - } - - @Override - public SimpleString getQueueName() { - return queueName; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/17528141/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java index 65de0c9..a8f2d85 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java @@ -153,7 +153,7 @@ public class PostOfficeJournalLoader implements JournalLoader { .maxConsumers(queueBindingInfo.getMaxConsumers()) .routingType(RoutingType.getType(queueBindingInfo.getRoutingType())); final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build()); - queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl)postOffice).getServer(), queueBindingInfo.getQueueName())); + queue.setConsumersRefCount(new QueueManagerImpl(((PostOfficeImpl)postOffice).getServer(), queueBindingInfo.getQueueName())); if (queueBindingInfo.getQueueStatusEncodings() != null) { for (QueueStatusEncoding encoding : queueBindingInfo.getQueueStatusEncodings()) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/17528141/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 6834bb4..8242760 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -851,13 +851,7 @@ public class QueueImpl implements Queue { refCountForConsumers.decrement(); } - if (noConsumers.decrementAndGet() == 0 && purgeOnNoConsumers) { - try { - deleteQueue(); - } catch (Exception e) { - logger.error("Error deleting queue on no consumers. " + this.toString(), e); - } - } + noConsumers.decrementAndGet(); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/17528141/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java new file mode 100644 index 0000000..692eba7 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.QueueManager; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.utils.ReferenceCounterUtil; + +public class QueueManagerImpl implements QueueManager { + + private final SimpleString queueName; + + private final ActiveMQServer server; + + private final Runnable runnable = new Runnable() { + @Override + public void run() { + Queue queue = server.locateQueue(queueName); + SimpleString address = queue.getAddress(); + AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString()); + long consumerCount = queue.getConsumerCount(); + long messageCount = queue.getMessageCount(); + + if (queue.isAutoCreated() && settings.isAutoDeleteQueues() && queue.getMessageCount() == 0) { + if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { + ActiveMQServerLogger.LOGGER.debug("deleting " + (queue.isAutoCreated() ? "auto-created " : "") + "queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteQueues = " + settings.isAutoDeleteQueues()); + } + + try { + server.destroyQueue(queueName, null, true, false); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e, queueName); + } + } else if (queue.isPurgeOnNoConsumers()) { + if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { + ActiveMQServerLogger.LOGGER.debug("purging queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount); + } + try { + queue.deleteAllReferences(); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.failedToPurgeQueue(e, queueName); + } + } + } + }; + + private final ReferenceCounterUtil referenceCounterUtil = new ReferenceCounterUtil(runnable); + + public QueueManagerImpl(ActiveMQServer server, SimpleString queueName) { + this.server = server; + this.queueName = queueName; + } + + @Override + public int increment() { + return referenceCounterUtil.increment(); + } + + @Override + public int decrement() { + return referenceCounterUtil.decrement(); + } + + @Override + public SimpleString getQueueName() { + return queueName; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/17528141/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java index 98957e2..0eb5f32 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java @@ -224,37 +224,30 @@ public class AddressingTest extends ActiveMQTestBase { @Test public void testPurgeOnNoConsumersTrue() throws Exception { - SimpleString address = new SimpleString("test.address"); SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString()); - // For each address, create 2 Queues with the same address, assert both queues receive message - boolean purgeOnNoConsumers = true; - Queue q1 = server.createQueue(address, RoutingType.MULTICAST, queueName, null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, purgeOnNoConsumers, true); - + server.createQueue(address, RoutingType.ANYCAST, queueName, null, null, true, false, false, false, false, 1, true, true); + assertNotNull(server.locateQueue(queueName)); ClientSession session = sessionFactory.createSession(); - session.start(); - - ClientConsumer consumer1 = session.createConsumer(q1.getName()); - consumer1.close(); - - assertFalse(server.queueQuery(queueName).isExists()); + ClientProducer producer = session.createProducer(address); + producer.send(session.createMessage(true)); + session.createConsumer(queueName).close(); + assertNotNull(server.locateQueue(queueName)); + assertEquals(0, server.locateQueue(queueName).getMessageCount()); } @Test public void testPurgeOnNoConsumersFalse() throws Exception { SimpleString address = new SimpleString("test.address"); SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString()); - // For each address, create 2 Queues with the same address, assert both queues receive message - boolean purgeOnNoConsumers = false; - Queue q1 = server.createQueue(address,RoutingType.MULTICAST, queueName, null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, purgeOnNoConsumers, true); - + server.createQueue(address, RoutingType.ANYCAST, queueName, null, null, true, false, false, false, false, 1, false, true); + assertNotNull(server.locateQueue(queueName)); ClientSession session = sessionFactory.createSession(); - session.start(); - - ClientConsumer consumer1 = session.createConsumer(q1.getName()); - consumer1.close(); - - assertTrue(server.queueQuery(queueName).isExists()); + ClientProducer producer = session.createProducer(address); + producer.send(session.createMessage(true)); + session.createConsumer(queueName).close(); + assertNotNull(server.locateQueue(queueName)); + assertEquals(1, server.locateQueue(queueName).getMessageCount()); } @Test http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/17528141/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java index 9406295..2823983 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java @@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.QueueImpl; +import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; @@ -61,7 +62,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); receiver2.close(); //check its been deleted - assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisfied() throws Exception { + return server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")) == null; + } + }, 1000); connection.close(); } @@ -117,7 +123,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); //check its been deleted connection.close(); - assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisfied() throws Exception { + return server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")) == null; + } + }, 1000); } @Test(timeout = 60000) @@ -144,7 +155,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global"))); receiver2.close(); //check its been deleted - assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global"))); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisfied() throws Exception { + return server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")) == null; + } + }, 1000); connection.close(); }
