This is an automated email from the ASF dual-hosted git repository. vavrtom pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/main by this push: new 3f553ff QPID-8563: [Broker-J] Purge all queues (#109) 3f553ff is described below commit 3f553ffba38ec70793506e7826b3bdad5342c05e Author: Marek Laca <mkl...@users.noreply.github.com> AuthorDate: Fri Oct 8 08:07:34 2021 +0200 QPID-8563: [Broker-J] Purge all queues (#109) --- .../server/virtualhost/AbstractVirtualHost.java | 108 +++++++++++++++- .../virtualhost/QueueManagingVirtualHost.java | 6 + .../virtualhost/AbstractVirtualHostTest.java | 137 ++++++++++++++++++++- .../resources/js/qpid/management/VirtualHost.js | 35 ++++++ .../src/main/java/resources/showVirtualHost.html | 1 + 5 files changed, 285 insertions(+), 2 deletions(-) diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 0b1d1de..09c4fc1 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -100,7 +100,35 @@ import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.RoutingResult; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.message.internal.InternalMessage; -import org.apache.qpid.server.model.*; +import org.apache.qpid.server.model.AbstractConfigurationChangeListener; +import org.apache.qpid.server.model.AbstractConfiguredObject; +import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.BrokerModel; +import org.apache.qpid.server.model.ConfigurationChangeListener; +import org.apache.qpid.server.model.ConfigurationExtractor; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Connection; +import org.apache.qpid.server.model.Content; +import org.apache.qpid.server.model.CustomRestHeaders; +import org.apache.qpid.server.model.DoOnConfigThread; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.ManageableMessage; +import org.apache.qpid.server.model.ManagedAttributeField; +import org.apache.qpid.server.model.NoFactoryForTypeException; +import org.apache.qpid.server.model.NotFoundException; +import org.apache.qpid.server.model.Param; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.RestContentHeader; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.StateTransition; +import org.apache.qpid.server.model.SystemConfig; +import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.VirtualHostAccessControlProvider; +import org.apache.qpid.server.model.VirtualHostLogger; +import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.model.preferences.Preference; import org.apache.qpid.server.model.preferences.UserPreferences; @@ -3482,4 +3510,82 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte inMemoryMessageSize); } } + + @Override + public long clearMatchingQueues(String queueNamePattern) + { + LOGGER.debug("Clearing the queues with name that matches the pattern: '{}'", queueNamePattern); + try + { + final Pattern pattern = Pattern.compile(queueNamePattern); + + long count = 0; + for (final Queue<?> queue : getChildren(Queue.class)) + { + if (pattern.matcher(queue.getName()).matches()) + { + LOGGER.debug("Clearing the queue with name '{}' and ID '{}'", queue.getName(), queue.getId()); + count += queue.clearQueue(); + } + } + return count; + } + catch (PatternSyntaxException e) + { + final String message = String.format("Failed to compile queue name pattern: '%s'", queueNamePattern); + LOGGER.debug(message, e); + throw new IllegalArgumentException(message, e); + } + } + + @Override + public long clearQueues(Collection<String> queues) + { + final Map<UUID, String> uuid = new HashMap<>(); + final Set<String> names = new HashSet<>(); + for (final String id : queues) + { + try + { + uuid.put(UUID.fromString(id), id); + } + catch (IllegalArgumentException e) + { + LOGGER.trace(String.format("'%s' is not a valid queue ID", id), e); + names.add(id); + } + } + + final Collection<Queue> queueList = getChildren(Queue.class); + long count = 0; + + if (!uuid.isEmpty()) + { + LOGGER.debug("Clearing the queues with IDs: {}", uuid.values()); + for (final Queue<?> queue : queueList) + { + if (uuid.remove(queue.getId()) != null) + { + LOGGER.debug("Clearing the queue with ID '{}'", queue.getId()); + count += queue.clearQueue(); + } + } + names.addAll(uuid.values()); + } + + if (!names.isEmpty()) + { + LOGGER.debug("Clearing the queues with names: {}", names); + for (final Queue<?> queue : queueList) + { + if (names.contains(queue.getName())) + { + LOGGER.debug("Clearing the queue with name '{}'", queue.getName()); + count += queue.clearQueue(); + } + } + } + + return count; + } } diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java index 690c076..003b4d1 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java +++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java @@ -343,6 +343,12 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>> @Param(name = "role", description = "whether to remove only sending links (\"SENDER\"), receiving links (\"RECEIVER\") or both (\"BOTH\")", validValues = {"SENDER", "RECEIVER", "BOTH"}, defaultValue = "BOTH") String role, @Param(name = "linkNamePattern", description = "Regular Expression to match the link names to be removed.", defaultValue = ".*") String linkNamePattern); + @ManagedOperation(description = "Purge every queue with the name that matches the regular expression.", changesConfiguredObjectState = false) + long clearMatchingQueues(@Param(name = "queueNamePattern", description = "Regular Expression to match the queue name.", defaultValue = ".*") String queueNamePattern); + + @ManagedOperation(description = "Purge queues in provided list.", changesConfiguredObjectState = false) + long clearQueues(@Param(name = "queues", description = "Collection of queue IDs or names.") Collection<String> queues); + Queue<?> getSubscriptionQueue(final String exchangeName, final Map<String, Object> attributes, final Map<String, Map<String, Object>> bindings); diff --git a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java index 00f1785..b22555d 100644 --- a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java +++ b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.virtualhost; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.any; @@ -33,8 +34,13 @@ import static org.mockito.Mockito.when; import java.io.File; import java.security.Principal; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -46,6 +52,7 @@ import ch.qos.logback.core.spi.FilterReply; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,13 +64,13 @@ import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.BrokerModel; import org.apache.qpid.server.model.BrokerTestHelper; import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.SystemConfig; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.security.AccessControl; import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.preferences.PreferenceStore; import org.apache.qpid.server.util.FileUtils; @@ -350,4 +357,132 @@ public class AbstractVirtualHostTest extends UnitTestBase action.run(); assertTrue("Did not receive expected log message", logMessageReceivedLatch.await(2, TimeUnit.SECONDS)); } + + @Test + public void testClearMatchingQueues() + { + final List<Queue> queues = new ArrayList<>(); + final Queue<?> queueA = newQueue("queueA"); + queues.add(queueA); + final Queue<?> topic = newQueue("queue-topic"); + queues.add(topic); + final Queue<?> queueB = newQueue("queueB"); + queues.add(queueB); + + newVirtualHost(queues).clearMatchingQueues("queue.?"); + Mockito.verify(queueA).clearQueue(); + Mockito.verify(queueB).clearQueue(); + Mockito.verify(topic, Mockito.never()).clearQueue(); + } + + @Test + public void testClearMatchingQueues_any() + { + final List<Queue> queues = new ArrayList<>(); + queues.add(newQueue("queueA")); + queues.add(newQueue("queue-topic")); + queues.add(newQueue("queueB")); + + newVirtualHost(queues).clearMatchingQueues(".*"); + for (final Queue<?> queue : queues) + { + Mockito.verify(queue).clearQueue(); + } + } + + @Test + public void testClearMatchingQueues_exception() + { + final List<Queue> queues = new ArrayList<>(); + queues.add(newQueue("queueA")); + queues.add(newQueue("queue-topic")); + queues.add(newQueue("queueB")); + + final AbstractVirtualHost host = newVirtualHost(queues); + try + { + host.clearMatchingQueues(".*["); + fail("An exception is expected!"); + } + catch (RuntimeException e) + { + assertNotNull(e.getMessage()); + } + for (final Queue<?> queue : queues) + { + Mockito.verify(queue, Mockito.never()).clearQueue(); + } + } + + @Test + public void testClearQueues() + { + final List<Queue> queues = new ArrayList<>(); + final Queue<?> queueA = newQueue("queueA"); + queues.add(queueA); + final Queue<?> topic = newQueue("queue-topic"); + queues.add(topic); + final Queue<?> queueB = newQueue("queueB"); + queues.add(queueB); + + newVirtualHost(queues).clearQueues(Arrays.asList("queue-topic", queueB.getId().toString())); + Mockito.verify(queueA, Mockito.never()).clearQueue(); + Mockito.verify(queueB).clearQueue(); + Mockito.verify(topic).clearQueue(); + } + + @Test + public void testClearQueues_none() + { + final List<Queue> queues = new ArrayList<>(); + queues.add(newQueue("queueA")); + queues.add(newQueue("queue-topic")); + queues.add(newQueue("queueB")); + + newVirtualHost(queues).clearQueues(Collections.emptySet()); + for (final Queue<?> queue : queues) + { + Mockito.verify(queue, Mockito.never()).clearQueue(); + } + } + + private AbstractVirtualHost newVirtualHost(List<Queue> queues) + { + final Map<String, Object> attributes = Collections.singletonMap(AbstractVirtualHost.NAME, getTestName()); + final MessageStore store = mock(MessageStore.class); + return new AbstractVirtualHost(attributes, _node) + { + @Override + protected MessageStore createMessageStore() + { + return store; + } + + @Override + public Collection getChildren(Class clazz) + { + if (clazz == Queue.class) + { + return queues; + } + else + { + return super.getChildren(clazz); + } + + } + }; + } + + private Queue<?> newQueue(final String name) + { + final Queue<?> queue = Mockito.mock(Queue.class); + Mockito.doReturn(name).when(queue).getName(); + + final UUID uuid = UUID.randomUUID(); + Mockito.doReturn(uuid).when(queue).getId(); + + Mockito.doReturn(Queue.class).when(queue).getCategoryClass(); + return queue; + } } diff --git a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/VirtualHost.js b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/VirtualHost.js index c19a9f8..8d7ef9a 100644 --- a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/VirtualHost.js +++ b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/VirtualHost.js @@ -114,6 +114,12 @@ define(["dojo/parser", "delete", "queue"); }); + const clearQueueButton = query(".clearQueueButton", containerNode)[0]; + registry.byNode(clearQueueButton).on("click", function (evt) + { + that._clearQueues(that.vhostUpdater.queuesGrid); + }); + var addExchangeButton = query(".addExchangeButton", containerNode)[0]; registry.byNode(addExchangeButton).on("click", function (evt) { @@ -286,6 +292,35 @@ define(["dojo/parser", return confirmed; }; + VirtualHost.prototype._clearQueues = function (dgrid) + { + let selected = []; + for (let item in dgrid.selection) + { + if (dgrid.selection.hasOwnProperty(item) && dgrid.selection[item]) + { + selected.push(item); + } + } + if (selected.length > 0) + { + const plural = selected.length === 1 ? "" : "s"; + if (confirm(lang.replace("Are you sure you want to purge {0} queue{1}?", [selected.length, plural]))) + { + const modelObj = { + type: "virtualhost", + name: "clearQueues", + parent: this.modelObj + }; + this.management.update(modelObj, {"queues" : selected}).then(lang.hitch(this, function () + { + dgrid.clearSelection(); + this.vhostUpdater.update(); + })); + } + } + }; + VirtualHost.prototype.close = function () { updater.remove(this.vhostUpdater); diff --git a/broker-plugins/management-http/src/main/java/resources/showVirtualHost.html b/broker-plugins/management-http/src/main/java/resources/showVirtualHost.html index 217d43f..300f862 100644 --- a/broker-plugins/management-http/src/main/java/resources/showVirtualHost.html +++ b/broker-plugins/management-http/src/main/java/resources/showVirtualHost.html @@ -127,6 +127,7 @@ <div class="queues"></div> <button data-dojo-type="dijit.form.Button" class="addQueueButton">Add Queue</button> <button data-dojo-type="dijit.form.Button" class="deleteQueueButton">Delete Queue</button> + <button data-dojo-type="dijit.form.Button" class="clearQueueButton">Clear Queue</button> </div> <br/> <div data-dojo-type="dijit.TitlePane" data-dojo-props="title: 'Connections'" class="virtualHostConnections"> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org