ARTEMIS-898 - Adding Plugin Support Adding a new ActievMQServerPlugin interface to support adding custom behavior to the broker at certain events such as connection or session creation.
https://issues.apache.org/jira/browse/ARTEMIS-898 Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1e1ede84 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1e1ede84 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1e1ede84 Branch: refs/heads/master Commit: 1e1ede84c0483099f27741bc046ef95c08e1d090 Parents: 303d97c Author: Christopher L. Shannon (cshannon) <[email protected]> Authored: Tue May 2 09:46:17 2017 -0400 Committer: Christopher L. Shannon (cshannon) <[email protected]> Committed: Wed May 3 11:21:32 2017 -0400 ---------------------------------------------------------------------- .../artemis/core/config/Configuration.java | 21 ++ .../core/config/impl/ConfigurationImpl.java | 24 ++ .../core/postoffice/impl/PostOfficeImpl.java | 4 + .../server/impl/RemotingServiceImpl.java | 7 +- .../artemis/core/server/ActiveMQServer.java | 18 +- .../core/server/cluster/ClusterManager.java | 3 + .../core/server/impl/ActiveMQServerImpl.java | 57 +++- .../core/server/impl/LastValueQueue.java | 6 +- .../core/server/impl/QueueFactoryImpl.java | 19 +- .../artemis/core/server/impl/QueueImpl.java | 31 +- .../core/server/impl/ServerConsumerImpl.java | 7 + .../core/server/impl/ServerSessionImpl.java | 33 +- .../server/plugin/ActiveMQPluginRunnable.java | 24 ++ .../server/plugin/ActiveMQServerPlugin.java | 336 +++++++++++++++++++ .../integration/amqp/AmqpClientTestSupport.java | 7 + .../integration/client/HangConsumerTest.java | 37 +- .../client/InterruptedLargeMessageTest.java | 18 +- .../jms/client/TopicCleanupTest.java | 12 +- .../openwire/amq/JmsResourceProvider.java | 2 +- .../integration/plugin/AmqpPluginTest.java | 131 ++++++++ .../integration/plugin/CorePluginTest.java | 257 ++++++++++++++ .../plugin/MethodCalledVerifier.java | 276 +++++++++++++++ .../integration/plugin/MqttPluginTest.java | 132 ++++++++ .../integration/plugin/OpenwirePluginTest.java | 109 ++++++ .../integration/plugin/StompPluginTest.java | 126 +++++++ .../timing/core/server/impl/QueueImplTest.java | 15 +- .../unit/core/server/impl/QueueImplTest.java | 3 +- .../server/impl/fakes/FakeQueueFactory.java | 7 +- 28 files changed, 1664 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java index 7dfb1a5..7da5b02 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java @@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.SecuritySettingPlugin; import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings; @@ -1081,4 +1082,24 @@ public interface Configuration { Configuration setNetworkCheckPing6Command(String command); String getInternalNamingPrefix(); + + /** + * @param plugins + */ + void registerBrokerPlugins(List<ActiveMQServerPlugin> plugins); + + /** + * @param plugin + */ + void registerBrokerPlugin(ActiveMQServerPlugin plugin); + + /** + * @param plugin + */ + void unRegisterBrokerPlugin(ActiveMQServerPlugin plugin); + + /** + * @return + */ + List<ActiveMQServerPlugin> getBrokerPlugins(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java index 2a538ca..8edeb5b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java @@ -38,6 +38,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration; @@ -63,6 +64,7 @@ import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.NetworkHealthCheck; import org.apache.activemq.artemis.core.server.SecuritySettingPlugin; import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings; import org.apache.activemq.artemis.utils.Env; @@ -232,6 +234,8 @@ public class ConfigurationImpl implements Configuration, Serializable { private List<SecuritySettingPlugin> securitySettingPlugins = new ArrayList<>(); + private final List<ActiveMQServerPlugin> brokerPlugins = new CopyOnWriteArrayList<>(); + private Map<String, Set<String>> securityRoleNameMappings = new HashMap<>(); protected List<ConnectorServiceConfiguration> connectorServiceConfigurations = new ArrayList<>(); @@ -1321,6 +1325,26 @@ public class ConfigurationImpl implements Configuration, Serializable { } @Override + public void registerBrokerPlugins(final List<ActiveMQServerPlugin> plugins) { + brokerPlugins.addAll(plugins); + } + + @Override + public void registerBrokerPlugin(final ActiveMQServerPlugin plugin) { + brokerPlugins.add(plugin); + } + + @Override + public void unRegisterBrokerPlugin(final ActiveMQServerPlugin plugin) { + brokerPlugins.remove(plugin); + } + + @Override + public List<ActiveMQServerPlugin> getBrokerPlugins() { + return brokerPlugins; + } + + @Override public File getBrokerInstance() { if (artemisInstance != null) { return artemisInstance; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 2ef7657..a927768 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -783,7 +783,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } else { try { + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeMessageRoute(message, context, direct, rejectDuplicates) : null); processRoute(message, context, direct); + final RoutingStatus finalResult = result; + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterMessageRoute(message, context, direct, + rejectDuplicates, finalResult) : null); } catch (ActiveMQAddressFullException e) { if (startedTX.get()) { context.getTransaction().rollback(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java index e0e5b52..7c9c675 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java @@ -514,6 +514,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif } ConnectionEntry entry = protocol.createConnectionEntry((Acceptor) component, connection); + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCreateConnection(entry.connection) : null); if (logger.isTraceEnabled()) { logger.trace("Connection created " + connection); @@ -534,8 +535,10 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif ConnectionEntry conn = connections.get(connectionID); if (conn != null && !conn.connection.isSupportReconnect()) { - removeConnection(connectionID); - + RemotingConnection removedConnection = removeConnection(connectionID); + if (removedConnection != null) { + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterDestroyConnection(removedConnection) : null); + } conn.connection.fail(new ActiveMQRemoteDisconnectException()); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index bfd9aec..e16557f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.core.server; -import javax.management.MBeanServer; import java.util.Collection; import java.util.List; import java.util.Map; @@ -24,6 +23,8 @@ import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import javax.management.MBeanServer; + import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.BridgeConfiguration; @@ -47,6 +48,8 @@ import org.apache.activemq.artemis.core.server.impl.Activation; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.ConnectorsService; import org.apache.activemq.artemis.core.server.management.ManagementService; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; import org.apache.activemq.artemis.core.server.reload.ReloadManager; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; @@ -186,6 +189,18 @@ public interface ActiveMQServer extends ServiceComponent { */ void callPostQueueDeletionCallbacks(SimpleString address, SimpleString queueName) throws Exception; + void registerBrokerPlugin(ActiveMQServerPlugin plugin); + + void unRegisterBrokerPlugin(ActiveMQServerPlugin plugin); + + void registerBrokerPlugins(List<ActiveMQServerPlugin> plugins); + + List<ActiveMQServerPlugin> getBrokerPlugins(); + + void callBrokerPlugins(ActiveMQPluginRunnable pluginRun); + + boolean hasBrokerPlugins(); + void checkQueueCreationLimit(String username) throws Exception; ServerSession createSession(String name, @@ -447,4 +462,5 @@ public interface ActiveMQServer extends ServiceComponent { void removeAddressInfo(SimpleString address, SecurityAuth auth) throws Exception; String getInternalNamingPrefix(); + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java index d2219c2..70edb68 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java @@ -405,6 +405,8 @@ public final class ClusterManager implements ActiveMQComponent { return; } + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeDeployBridge(config) : null); + Queue queue = (Queue) binding.getBindable(); ServerLocatorInternal serverLocator; @@ -478,6 +480,7 @@ public final class ClusterManager implements ActiveMQComponent { bridge.start(); + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterDeployBridge(bridge) : null); } public static class IncomingInterceptorLookingForExceptionMessage implements Interceptor { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 8482cb3..06964ee 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -16,8 +16,6 @@ */ package org.apache.activemq.artemis.core.server.impl; -import javax.management.MBeanServer; -import javax.security.cert.X509Certificate; import java.io.File; import java.io.IOException; import java.io.PrintWriter; @@ -48,6 +46,9 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import javax.management.MBeanServer; +import javax.security.cert.X509Certificate; + import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException; import org.apache.activemq.artemis.api.core.Pair; @@ -144,6 +145,8 @@ import org.apache.activemq.artemis.core.server.group.impl.LocalGroupingHandler; import org.apache.activemq.artemis.core.server.group.impl.RemoteGroupingHandler; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; import org.apache.activemq.artemis.core.server.reload.ReloadCallback; import org.apache.activemq.artemis.core.server.reload.ReloadManager; import org.apache.activemq.artemis.core.server.reload.ReloadManagerImpl; @@ -1309,10 +1312,15 @@ public class ActiveMQServerImpl implements ActiveMQServer { checkSessionLimit(validatedUser); + callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeCreateSession(name, username, minLargeMessageSize, connection, + autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, autoCreateQueues, context, prefixes) : null); + final ServerSessionImpl session = internalCreateSession(name, username, password, validatedUser, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues, prefixes); sessions.put(name, session); + callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterCreateSession(session) : null); + return session; } @@ -1705,6 +1713,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { return; } + callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeDestroyQueue(queueName, session, checkConsumerCount, + removeConsumers, autoDeleteAddress) : null); + addressSettingsRepository.clearCache(); Binding binding = postOffice.getBinding(queueName); @@ -1743,6 +1754,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { } callPostQueueDeletionCallbacks(address, queueName); + + callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterDestroyQueue(queue, address, session, checkConsumerCount, + removeConsumers, autoDeleteAddress) : null); } @Override @@ -1808,6 +1822,38 @@ public class ActiveMQServerImpl implements ActiveMQServer { } @Override + public void registerBrokerPlugins(final List<ActiveMQServerPlugin> plugins) { + configuration.registerBrokerPlugins(plugins); + } + + @Override + public void registerBrokerPlugin(final ActiveMQServerPlugin plugin) { + configuration.registerBrokerPlugin(plugin); + } + + @Override + public void unRegisterBrokerPlugin(final ActiveMQServerPlugin plugin) { + configuration.unRegisterBrokerPlugin(plugin); + } + + @Override + public List<ActiveMQServerPlugin> getBrokerPlugins() { + return configuration.getBrokerPlugins(); + } + + @Override + public void callBrokerPlugins(final ActiveMQPluginRunnable pluginRun) { + if (pluginRun != null) { + getBrokerPlugins().forEach(plugin -> pluginRun.run(plugin)); + } + } + + @Override + public boolean hasBrokerPlugins() { + return !getBrokerPlugins().isEmpty(); + } + + @Override public ExecutorFactory getExecutorFactory() { return executorFactory; } @@ -2103,7 +2149,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { securityStore = new SecurityStoreImpl(securityRepository, securityManager, configuration.getSecurityInvalidationInterval(), configuration.isSecurityEnabled(), configuration.getClusterUser(), configuration.getClusterPassword(), managementService); - queueFactory = new QueueFactoryImpl(executorFactory, scheduledPool, addressSettingsRepository, storageManager); + queueFactory = new QueueFactoryImpl(executorFactory, scheduledPool, addressSettingsRepository, storageManager, this); pagingManager = createPagingManager(); @@ -2508,6 +2554,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).routingType(routingType).maxConsumers(maxConsumers).purgeOnNoConsumers(purgeOnNoConsumers).build(); + callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeCreateQueue(queueConfig) : null); + final Queue queue = queueFactory.createQueueWith(queueConfig); if (transientQueue) { @@ -2550,6 +2598,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { callPostQueueCreationCallbacks(queue.getName()); + callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterCreateQueue(queue) : null); + return queue; } @@ -2763,4 +2813,5 @@ public class ActiveMQServerImpl implements ActiveMQServer { deployAddressesFromConfiguration(config); } } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index ceec92c..8370839 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.PostOffice; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.MessageReference; @@ -63,8 +64,9 @@ public class LastValueQueue extends QueueImpl { final PostOffice postOffice, final StorageManager storageManager, final HierarchicalRepository<AddressSettings> addressSettingsRepository, - final Executor executor) { - super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor); + final Executor executor, + final ActiveMQServer server) { + super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java index 9258a07..3d8ceb1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java @@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.PostOffice; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueConfig; import org.apache.activemq.artemis.core.server.QueueFactory; @@ -49,17 +50,19 @@ public class QueueFactoryImpl implements QueueFactory { protected final ExecutorFactory executorFactory; + protected final ActiveMQServer server; + public QueueFactoryImpl(final ExecutorFactory executorFactory, final ScheduledExecutorService scheduledExecutor, final HierarchicalRepository<AddressSettings> addressSettingsRepository, - final StorageManager storageManager) { - this.addressSettingsRepository = addressSettingsRepository; + final StorageManager storageManager, + final ActiveMQServer server) { + this.addressSettingsRepository = addressSettingsRepository; this.scheduledExecutor = scheduledExecutor; - this.storageManager = storageManager; - this.executorFactory = executorFactory; + this.server = server; } @Override @@ -72,9 +75,9 @@ public class QueueFactoryImpl implements QueueFactory { final AddressSettings addressSettings = addressSettingsRepository.getMatch(config.address().toString()); final Queue queue; if (addressSettings.isLastValueQueue()) { - queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); + queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server); } else { - queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); + queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server); } return queue; } @@ -98,9 +101,9 @@ public class QueueFactoryImpl implements QueueFactory { Queue queue; if (addressSettings.isLastValueQueue()) { - queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); + queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server); } else { - queue = new QueueImpl(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); + queue = new QueueImpl(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server); } return queue; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index a62ae79..c2cfdef 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -44,6 +44,7 @@ import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.ManagementHelper; @@ -69,7 +70,6 @@ import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler; import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor; @@ -198,6 +198,8 @@ public class QueueImpl implements Queue { private final HierarchicalRepository<AddressSettings> addressSettingsRepository; + private final ActiveMQServer server; + private final ScheduledExecutorService scheduledExecutor; private final SimpleString address; @@ -330,8 +332,9 @@ public class QueueImpl implements Queue { final PostOffice postOffice, final StorageManager storageManager, final HierarchicalRepository<AddressSettings> addressSettingsRepository, - final Executor executor) { - this(id, address, name, filter, null, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor); + final Executor executor, + final ActiveMQServer server) { + this(id, address, name, filter, null, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server); } public QueueImpl(final long id, @@ -347,8 +350,9 @@ public class QueueImpl implements Queue { final PostOffice postOffice, final StorageManager storageManager, final HierarchicalRepository<AddressSettings> addressSettingsRepository, - final Executor executor) { - this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, RoutingType.MULTICAST, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor); + final Executor executor, + final ActiveMQServer server) { + this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, RoutingType.MULTICAST, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server); } public QueueImpl(final long id, @@ -367,7 +371,8 @@ public class QueueImpl implements Queue { final PostOffice postOffice, final StorageManager storageManager, final HierarchicalRepository<AddressSettings> addressSettingsRepository, - final Executor executor) { + final Executor executor, + final ActiveMQServer server) { this.id = id; @@ -401,6 +406,8 @@ public class QueueImpl implements Queue { this.scheduledExecutor = scheduledExecutor; + this.server = server; + scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor); if (addressSettingsRepository != null) { @@ -1078,6 +1085,9 @@ public class QueueImpl implements Queue { messagesAcknowledged++; } + if (server != null) { + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.messageAcknowledged(ref, reason) : null); + } } @Override @@ -1112,6 +1122,10 @@ public class QueueImpl implements Queue { } else { messagesAcknowledged++; } + + if (server != null) { + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.messageAcknowledged(ref, reason) : null); + } } @Override @@ -1195,6 +1209,11 @@ public class QueueImpl implements Queue { } acknowledge(ref, AckReason.EXPIRED); } + + if (server != null) { + final SimpleString expiryAddress = messageExpiryAddress; + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.messageExpired(ref, expiryAddress) : null); + } } private SimpleString expiryAddressFromMessageAddress(MessageReference ref) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 9e33602..af8524d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -416,6 +416,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { try { Message message = reference.getMessage(); + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeDeliver(reference) : null); + if (message.isLargeMessage() && supportLargeMessage) { if (largeMessageDeliverer == null) { // This can't really happen as handle had already crated the deliverer @@ -432,6 +434,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } finally { lockDelivery.readLock().unlock(); callback.afterDelivery(); + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterDeliver(reference) : null); } } @@ -447,6 +450,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { logger.trace("ServerConsumerImpl::" + this + " being closed with failed=" + failed, new Exception("trace")); } + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeCloseConsumer(this, failed) : null); + setStarted(false); LargeMessageDeliverer del = largeMessageDeliverer; @@ -501,6 +506,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { managementService.sendNotification(notification); } + + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCloseConsumer(this, failed) : null); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index edd7afc..7245843 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -16,10 +16,8 @@ */ package org.apache.activemq.artemis.core.server.impl; -import javax.json.JsonArrayBuilder; -import javax.json.JsonObjectBuilder; -import javax.transaction.xa.XAException; -import javax.transaction.xa.Xid; +import static org.apache.activemq.artemis.api.core.JsonUtil.nullSafe; + import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -31,6 +29,11 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import javax.json.JsonArrayBuilder; +import javax.json.JsonObjectBuilder; +import javax.transaction.xa.XAException; +import javax.transaction.xa.Xid; + import org.apache.activemq.artemis.Closeable; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; @@ -89,8 +92,6 @@ import org.apache.activemq.artemis.utils.PrefixUtil; import org.apache.activemq.artemis.utils.TypedProperties; import org.jboss.logging.Logger; -import static org.apache.activemq.artemis.api.core.JsonUtil.nullSafe; - /** * Server side Session implementation */ @@ -345,6 +346,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener { protected void doClose(final boolean failed) throws Exception { synchronized (this) { + if (!closed) { + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeCloseSession(this, failed) : null); + } this.setStarted(false); if (closed) return; @@ -395,6 +399,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } closed = true; + + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCloseSession(this, failed) : null); } } @@ -444,9 +450,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener { Filter filter = FilterImpl.createFilter(filterString); + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeCreateConsumer(consumerID, unPrefixedQueueName, + filterString, browseOnly, supportLargeMessage) : null); + ServerConsumer consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding) binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server); consumers.put(consumer.getID(), consumer); + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCreateConsumer(consumer) : null); + if (!browseOnly) { TypedProperties props = new TypedProperties(); @@ -1290,6 +1301,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener { final boolean direct, boolean noAutoCreateQueue) throws Exception { + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeSend(tx, message, direct, noAutoCreateQueue) : null); + // If the protocol doesn't support flow control, we have no choice other than fail the communication if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) { ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit(); @@ -1333,10 +1346,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener { if (message.getAddressSimpleString().equals(managementAddress)) { // It's a management message - handleManagementMessage(tx, message, direct); + result = handleManagementMessage(tx, message, direct); } else { result = doSend(tx, message, address, direct, noAutoCreateQueue); } + + final RoutingStatus finalResult = result; + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterSend(tx, message, direct, noAutoCreateQueue, finalResult) : null); + return result; } @@ -1367,10 +1384,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener { @Override public void addMetaData(String key, String data) { + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeSessionMetadataAdded(this, key, data) : null); if (metaData == null) { metaData = new HashMap<>(); } metaData.put(key, data); + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterSessionMetadataAdded(this, key, data) : null); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java new file mode 100644 index 0000000..bc85475 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.plugin; + +public interface ActiveMQPluginRunnable { + + void run(ActiveMQServerPlugin plugin); + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java new file mode 100644 index 0000000..95296f0 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java @@ -0,0 +1,336 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.plugin; + +import java.util.Map; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.BridgeConfiguration; +import org.apache.activemq.artemis.core.persistence.OperationContext; +import org.apache.activemq.artemis.core.postoffice.RoutingStatus; +import org.apache.activemq.artemis.core.security.SecurityAuth; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.QueueConfig; +import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.server.ServerSession; +import org.apache.activemq.artemis.core.server.cluster.Bridge; +import org.apache.activemq.artemis.core.server.impl.AckReason; +import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; + + +public interface ActiveMQServerPlugin { + + + /** + * A connection has been created. + * + * @param connection The newly created connection + */ + default void afterCreateConnection(RemotingConnection connection) { + + } + + /** + * A connection has been destroyed. + * + * @param connection + */ + default void afterDestroyConnection(RemotingConnection connection) { + + } + + /** + * Before a session is created. + * + * @param name + * @param username + * @param minLargeMessageSize + * @param connection + * @param autoCommitSends + * @param autoCommitAcks + * @param preAcknowledge + * @param xa + * @param defaultAddress + * @param callback + * @param autoCreateQueues + * @param context + * @param prefixes + */ + default void beforeCreateSession(String name, String username, int minLargeMessageSize, + RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, + boolean xa, String defaultAddress, SessionCallback callback, boolean autoCreateQueues, OperationContext context, + Map<SimpleString, RoutingType> prefixes) { + + } + + /** + * After a session has been created. + * + * @param session The newly created session + */ + default void afterCreateSession(ServerSession session) { + + } + + /** + * Before a session is closed + * + * @param session + * @param failed + */ + default void beforeCloseSession(ServerSession session, boolean failed) { + + } + + /** + * After a session is closed + * + * @param session + * @param failed + */ + default void afterCloseSession(ServerSession session, boolean failed) { + + } + + /** + * Before session metadata is added to the session + * + * @param session + * @param key + * @param data + */ + default void beforeSessionMetadataAdded(ServerSession session, String key, String data) { + + } + + /** + * After session metadata is added to the session + * + * @param session + * @param key + * @param data + */ + default void afterSessionMetadataAdded(ServerSession session, String key, String data) { + + } + + /** + * Before a consumer is created + * + * @param consumerID + * @param queueName + * @param filterString + * @param browseOnly + * @param supportLargeMessage + */ + default void beforeCreateConsumer(long consumerID, SimpleString queueName, SimpleString filterString, + boolean browseOnly, boolean supportLargeMessage) { + + } + + /** + * After a consumer has been created + * + * @param consumer the created consumer + */ + default void afterCreateConsumer(ServerConsumer consumer) { + + } + + /** + * Before a consumer is closed + * + * @param consumer + * @param failed + */ + default void beforeCloseConsumer(ServerConsumer consumer, boolean failed) { + + } + + /** + * After a consumer is closed + * + * @param consumer + * @param failed + */ + default void afterCloseConsumer(ServerConsumer consumer, boolean failed) { + + } + + /** + * Before a queue is created + * + * @param queueConfig + */ + default void beforeCreateQueue(QueueConfig queueConfig) { + + } + + /** + * After a queue has been created + * + * @param queue The newly created queue + */ + default void afterCreateQueue(Queue queue) { + + } + + /** + * Before a queue is destroyed + * + * @param queueName + * @param session + * @param checkConsumerCount + * @param removeConsumers + * @param autoDeleteAddress + */ + default void beforeDestroyQueue(SimpleString queueName, final SecurityAuth session, boolean checkConsumerCount, + boolean removeConsumers, boolean autoDeleteAddress) { + + } + + /** + * After a queue has been destroyed + * + * @param queue + * @param address + * @param session + * @param checkConsumerCount + * @param removeConsumers + * @param autoDeleteAddress + */ + default void afterDestroyQueue(Queue queue, SimpleString address, final SecurityAuth session, boolean checkConsumerCount, + boolean removeConsumers, boolean autoDeleteAddress) { + + } + + /** + * Before a message is sent + * + * @param tx + * @param message + * @param direct + * @param noAutoCreateQueue + */ + default void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) { + + } + + /** + * After a message is sent + * + * @param tx + * @param message + * @param direct + * @param noAutoCreateQueue + * @param result + */ + default void afterSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue, + RoutingStatus result) { + + } + + /** + * Before a message is routed + * + * @param message + * @param context + * @param direct + * @param rejectDuplicates + */ + default void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) { + + } + + /** + * After a message is routed + * + * @param message + * @param context + * @param direct + * @param rejectDuplicates + * @param result + */ + default void afterMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates, + RoutingStatus result) { + + } + + /** + * Before a message is delivered to a client consumer + * + * @param reference + */ + default void beforeDeliver(MessageReference reference) { + + } + + /** + * After a message is delivered to a client consumer + * + * @param reference + */ + default void afterDeliver(MessageReference reference) { + + } + + /** + * A message has been expired + * + * @param message The expired message + * @param messageExpiryAddress The message expiry address if exists + */ + default void messageExpired(MessageReference message, SimpleString messageExpiryAddress) { + + } + + /** + * A message has been acknowledged + * + * @param ref The acked message + * @param reason The ack reason + */ + default void messageAcknowledged(MessageReference ref, AckReason reason) { + + } + + /** + * Before a bridge is deployed + * + * @param config The bridge configuration + */ + default void beforeDeployBridge(BridgeConfiguration config) { + + } + + /** + * After a bridge has been deployed + * + * @param bridge The newly deployed bridge + */ + default void afterDeployBridge(Bridge bridge) { + + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java index 8d27895..60b9b74 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java @@ -166,6 +166,9 @@ public class AmqpClientTestSupport extends AmqpTestSupport { // Add optional security for tests that need it configureBrokerSecurity(server); + // Add extra configuration + addConfiguration(server); + server.start(); // Prepare all addresses and queues for client tests. @@ -174,6 +177,10 @@ public class AmqpClientTestSupport extends AmqpTestSupport { return server; } + protected void addConfiguration(ActiveMQServer server) { + + } + protected TransportConfiguration addAcceptorConfiguration(ActiveMQServer server, int port) { HashMap<String, Object> params = new HashMap<>(); params.put(TransportConstants.PORT_PROP_NAME, String.valueOf(port)); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java index 201a96b..da695ca 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.tests.integration.client; -import javax.management.MBeanServer; import java.lang.management.ManagementFactory; import java.util.LinkedList; import java.util.Map; @@ -26,10 +25,12 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import javax.management.MBeanServer; + import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Interceptor; - import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; @@ -53,7 +54,6 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage; import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueConfig; @@ -234,8 +234,10 @@ public class HangConsumerTest extends ActiveMQTestBase { final PostOffice postOffice, final StorageManager storageManager, final HierarchicalRepository<AddressSettings> addressSettingsRepository, - final Executor executor) { - super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, deliveryMode, maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor); + final Executor executor, final ActiveMQServer server) { + super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, deliveryMode, + maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, + addressSettingsRepository, executor, server); } @Override @@ -256,13 +258,18 @@ public class HangConsumerTest extends ActiveMQTestBase { LocalFactory(final ExecutorFactory executorFactory, final ScheduledExecutorService scheduledExecutor, final HierarchicalRepository<AddressSettings> addressSettingsRepository, - final StorageManager storageManager) { - super(executorFactory, scheduledExecutor, addressSettingsRepository, storageManager); + final StorageManager storageManager, final ActiveMQServer server) { + super(executorFactory, scheduledExecutor, addressSettingsRepository, storageManager, server); } @Override public Queue createQueueWith(final QueueConfig config) { - queue = new MyQueueWithBlocking(config.id(), config.address(), config.name(), config.filter(), config.user(), config.pageSubscription(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); + queue = new MyQueueWithBlocking(config.id(), config.address(), config.name(), config.filter(), + config.user(), config.pageSubscription(), config.isDurable(), + config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), + config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, + postOffice, storageManager, addressSettingsRepository, + executorFactory.getExecutor(), server); return queue; } @@ -277,13 +284,18 @@ public class HangConsumerTest extends ActiveMQTestBase { final boolean durable, final boolean temporary, final boolean autoCreated) { - queue = new MyQueueWithBlocking(persistenceID, address, name, filter, user, pageSubscription, durable, temporary, autoCreated, RoutingType.MULTICAST, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); + queue = new MyQueueWithBlocking(persistenceID, address, name, filter, user, pageSubscription, durable, + temporary, autoCreated, RoutingType.MULTICAST, null, null, + scheduledExecutor, postOffice, storageManager, addressSettingsRepository, + executorFactory.getExecutor(), server); return queue; } } - LocalFactory queueFactory = new LocalFactory(server.getExecutorFactory(), server.getScheduledPool(), server.getAddressSettingsRepository(), server.getStorageManager()); + LocalFactory queueFactory = + new LocalFactory(server.getExecutorFactory(), server.getScheduledPool(), + server.getAddressSettingsRepository(), server.getStorageManager(), server); queueFactory.setPostOffice(server.getPostOffice()); @@ -359,7 +371,10 @@ public class HangConsumerTest extends ActiveMQTestBase { long txID = server.getStorageManager().generateID(); // Forcing a situation where the server would unexpectedly create a duplicated queue. The server should still start normally - LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE, new QueueImpl(queueID, QUEUE, QUEUE, null, null, true, false, false, null, null, null, null, null), server.getNodeID()); + LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE, + new QueueImpl(queueID, QUEUE, QUEUE, null, null, true, false, + false, null, null, null, null, null, null), + server.getNodeID()); server.getStorageManager().addQueueBinding(txID, newBinding); server.getStorageManager().commitBindings(txID); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java index 540baf6..44015e1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java @@ -16,12 +16,6 @@ */ package org.apache.activemq.artemis.tests.integration.client; -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.Session; -import javax.transaction.xa.XAResource; -import javax.transaction.xa.Xid; import java.io.IOException; import java.util.HashMap; import java.util.concurrent.CountDownLatch; @@ -29,9 +23,17 @@ import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; + import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ClientConsumer; @@ -52,7 +54,6 @@ import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueConfig; import org.apache.activemq.artemis.core.server.QueueFactory; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; @@ -518,7 +519,8 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase { StorageManager storageManager, HierarchicalRepository<AddressSettings> addressSettingsRepository, Executor executor) { - super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor); + super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, + postOffice, storageManager, addressSettingsRepository, executor, null); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java index f8094a1..63743ed 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java @@ -16,14 +16,15 @@ */ package org.apache.activemq.artemis.tests.integration.jms.client; +import java.util.List; +import java.util.Map; + import javax.jms.Connection; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; -import java.util.List; -import java.util.Map; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; @@ -81,7 +82,12 @@ public class TopicCleanupTest extends JMSTestBase { for (int i = 0; i < 100; i++) { long txid = storage.generateID(); - final Queue queue = new QueueImpl(storage.generateID(), SimpleString.toSimpleString("topic"), SimpleString.toSimpleString("topic"), FilterImpl.createFilter(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), null, true, false, false, server.getScheduledPool(), server.getPostOffice(), storage, server.getAddressSettingsRepository(), server.getExecutorFactory().getExecutor()); + final Queue queue = new QueueImpl(storage.generateID(), SimpleString.toSimpleString("topic"), + SimpleString.toSimpleString("topic"), + FilterImpl.createFilter(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), null, + true, false, false, server.getScheduledPool(), server.getPostOffice(), + storage, server.getAddressSettingsRepository(), + server.getExecutorFactory().getExecutor(), server); LocalQueueBinding binding = new LocalQueueBinding(queue.getAddress(), queue, server.getNodeID()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsResourceProvider.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsResourceProvider.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsResourceProvider.java index 056891a..bd8cfd8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsResourceProvider.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsResourceProvider.java @@ -45,7 +45,7 @@ public class JmsResourceProvider { /** * Creates a connection. * - * @see org.apache.activemq.test.JmsResourceProvider#createConnection(javax.jms.ConnectionFactory) + * @see org.apache.activemq.test.JmsResourceProvider#afterCreateConnection(javax.jms.ConnectionFactory) */ public Connection createConnection(ConnectionFactory cf) throws JMSException { Connection connection = cf.createConnection(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/AmqpPluginTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/AmqpPluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/AmqpPluginTest.java new file mode 100644 index 0000000..d918b27 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/AmqpPluginTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.plugin; + +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test basic send and receive scenarios using only AMQP sender and receiver links. + */ +public class AmqpPluginTest extends AmqpClientTestSupport { + + protected static final Logger LOG = LoggerFactory.getLogger(AmqpPluginTest.class); + + private final Map<String, AtomicInteger> methodCalls = new HashMap<>(); + private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls); + + @Override + protected void addConfiguration(ActiveMQServer server) { + super.addConfiguration(server); + server.registerBrokerPlugin(verifier); + } + + @Test(timeout = 60000) + public void testQueueReceiverReadAndAckMessage() throws Exception { + sendMessages(getQueueName(), 1); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getQueueName()); + + Queue queueView = getProxyToQueue(getQueueName()); + assertEquals(1, queueView.getMessageCount()); + + receiver.flow(1); + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + message.accept(); + receiver.close(); + connection.close(); + + verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE); + verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, + BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, + BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, + BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, + AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER); + } + + @Override + public void sendMessages(String destinationName, int count) throws Exception { + sendMessages(destinationName, count, null); + } + + @Override + public void sendMessages(String destinationName, int count, RoutingType routingType) throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + try { + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(destinationName); + + for (int i = 0; i < count; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setMessageId("MessageID:" + i); + if (routingType != null) { + message.setMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE.toString(), routingType.getType()); + } + sender.send(message); + } + } finally { + connection.close(); + } + } +}
