ARTEMIS-2019 - Seperate ServerPlugin Interfaces Seperate plugin interface by area, all extending a base interface. Update code to check and call only plugins implementing specific interfaces. Existing interface extends all the new interfaces for back compatibility or those who want simplicity and don't care about perf.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/19e1bbeb Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/19e1bbeb Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/19e1bbeb Branch: refs/heads/master Commit: 19e1bbeb49ced9a8f7b5be09148ba27b53af3172 Parents: bf073f1 Author: Michael André Pearce <[email protected]> Authored: Thu Aug 9 13:43:45 2018 +0100 Committer: Clebert Suconic <[email protected]> Committed: Thu Aug 9 19:42:28 2018 -0400 ---------------------------------------------------------------------- .../artemis/core/config/Configuration.java | 64 +- .../core/config/impl/ConfigurationImpl.java | 131 +++- .../core/postoffice/impl/PostOfficeImpl.java | 48 +- .../server/impl/RemotingServiceImpl.java | 8 +- .../artemis/core/server/ActiveMQServer.java | 73 +- .../core/server/cluster/ClusterManager.java | 8 +- .../core/server/cluster/impl/BridgeImpl.java | 12 +- .../core/server/impl/ActiveMQServerImpl.java | 203 +++++- .../artemis/core/server/impl/QueueImpl.java | 12 +- .../core/server/impl/ServerConsumerImpl.java | 16 +- .../core/server/impl/ServerSessionImpl.java | 36 +- .../server/plugin/ActiveMQPluginRunnable.java | 4 +- .../plugin/ActiveMQServerAddressPlugin.java | 95 +++ .../server/plugin/ActiveMQServerBasePlugin.java | 52 ++ .../plugin/ActiveMQServerBindingPlugin.java | 73 ++ .../plugin/ActiveMQServerBridgePlugin.java | 85 +++ .../plugin/ActiveMQServerConnectionPlugin.java | 49 ++ .../plugin/ActiveMQServerConsumerPlugin.java | 97 +++ .../plugin/ActiveMQServerCriticalPlugin.java | 36 + .../plugin/ActiveMQServerMessagePlugin.java | 236 +++++++ .../server/plugin/ActiveMQServerPlugin.java | 673 +------------------ .../plugin/ActiveMQServerQueuePlugin.java | 81 +++ .../plugin/ActiveMQServerSessionPlugin.java | 126 ++++ .../core/config/impl/FileConfigurationTest.java | 3 +- 24 files changed, 1450 insertions(+), 771 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19e1bbeb/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java index 5d9c2eb..b1c49c3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java @@ -23,6 +23,16 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBridgePlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConnectionPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerCriticalPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin; import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy; import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; @@ -33,7 +43,6 @@ import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.SecuritySettingPlugin; import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration; -import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings; @@ -1129,20 +1138,65 @@ public interface Configuration { /** * @param plugins */ - void registerBrokerPlugins(List<ActiveMQServerPlugin> plugins); + void registerBrokerPlugins(List<ActiveMQServerBasePlugin> plugins); /** * @param plugin */ - void registerBrokerPlugin(ActiveMQServerPlugin plugin); + void registerBrokerPlugin(ActiveMQServerBasePlugin plugin); /** * @param plugin */ - void unRegisterBrokerPlugin(ActiveMQServerPlugin plugin); + void unRegisterBrokerPlugin(ActiveMQServerBasePlugin plugin); /** * @return */ - List<ActiveMQServerPlugin> getBrokerPlugins(); + List<ActiveMQServerBasePlugin> getBrokerPlugins(); + + /** + * @return + */ + List<ActiveMQServerConnectionPlugin> getBrokerConnectionPlugins(); + + /** + * @return + */ + List<ActiveMQServerSessionPlugin> getBrokerSessionPlugins(); + + /** + * @return + */ + List<ActiveMQServerConsumerPlugin> getBrokerConsumerPlugins(); + + /** + * @return + */ + List<ActiveMQServerAddressPlugin> getBrokerAddressPlugins(); + + /** + * @return + */ + List<ActiveMQServerQueuePlugin> getBrokerQueuePlugins(); + + /** + * @return + */ + List<ActiveMQServerBindingPlugin> getBrokerBindingPlugins(); + + /** + * @return + */ + List<ActiveMQServerMessagePlugin> getBrokerMessagePlugins(); + + /** + * @return + */ + List<ActiveMQServerBridgePlugin> getBrokerBridgePlugins(); + + /** + * @return + */ + List<ActiveMQServerCriticalPlugin> getBrokerCriticalPlugins(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19e1bbeb/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java index ae4a25f..3e51d63 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java @@ -42,6 +42,16 @@ import java.util.concurrent.CopyOnWriteArrayList; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBridgePlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConnectionPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerCriticalPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin; import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy; import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; @@ -66,7 +76,6 @@ import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.NetworkHealthCheck; import org.apache.activemq.artemis.core.server.SecuritySettingPlugin; import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration; -import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings; import org.apache.activemq.artemis.utils.Env; @@ -244,7 +253,16 @@ public class ConfigurationImpl implements Configuration, Serializable { private List<SecuritySettingPlugin> securitySettingPlugins = new ArrayList<>(); - private final List<ActiveMQServerPlugin> brokerPlugins = new CopyOnWriteArrayList<>(); + private final List<ActiveMQServerBasePlugin> brokerPlugins = new CopyOnWriteArrayList<>(); + private final List<ActiveMQServerConnectionPlugin> brokerConnectionPlugins = new CopyOnWriteArrayList<>(); + private final List<ActiveMQServerSessionPlugin> brokerSessionPlugins = new CopyOnWriteArrayList<>(); + private final List<ActiveMQServerConsumerPlugin> brokerConsumerPlugins = new CopyOnWriteArrayList<>(); + private final List<ActiveMQServerAddressPlugin> brokerAddressPlugins = new CopyOnWriteArrayList<>(); + private final List<ActiveMQServerQueuePlugin> brokerQueuePlugins = new CopyOnWriteArrayList<>(); + private final List<ActiveMQServerBindingPlugin> brokerBindingPlugins = new CopyOnWriteArrayList<>(); + private final List<ActiveMQServerMessagePlugin> brokerMessagePlugins = new CopyOnWriteArrayList<>(); + private final List<ActiveMQServerBridgePlugin> brokerBridgePlugins = new CopyOnWriteArrayList<>(); + private final List<ActiveMQServerCriticalPlugin> brokerCriticalPlugins = new CopyOnWriteArrayList<>(); private Map<String, Set<String>> securityRoleNameMappings = new HashMap<>(); @@ -1371,26 +1389,125 @@ public class ConfigurationImpl implements Configuration, Serializable { } @Override - public void registerBrokerPlugins(final List<ActiveMQServerPlugin> plugins) { - brokerPlugins.addAll(plugins); + public void registerBrokerPlugins(final List<ActiveMQServerBasePlugin> plugins) { + plugins.forEach(plugin -> registerBrokerPlugin(plugin)); } @Override - public void registerBrokerPlugin(final ActiveMQServerPlugin plugin) { + public void registerBrokerPlugin(final ActiveMQServerBasePlugin plugin) { brokerPlugins.add(plugin); + if (plugin instanceof ActiveMQServerConnectionPlugin) { + brokerConnectionPlugins.add((ActiveMQServerConnectionPlugin) plugin); + } + if (plugin instanceof ActiveMQServerSessionPlugin) { + brokerSessionPlugins.add((ActiveMQServerSessionPlugin) plugin); + } + if (plugin instanceof ActiveMQServerConsumerPlugin) { + brokerConsumerPlugins.add((ActiveMQServerConsumerPlugin) plugin); + } + if (plugin instanceof ActiveMQServerAddressPlugin) { + brokerAddressPlugins.add((ActiveMQServerAddressPlugin) plugin); + } + if (plugin instanceof ActiveMQServerQueuePlugin) { + brokerQueuePlugins.add((ActiveMQServerQueuePlugin) plugin); + } + if (plugin instanceof ActiveMQServerBindingPlugin) { + brokerBindingPlugins.add((ActiveMQServerBindingPlugin) plugin); + } + if (plugin instanceof ActiveMQServerMessagePlugin) { + brokerMessagePlugins.add((ActiveMQServerMessagePlugin) plugin); + } + if (plugin instanceof ActiveMQServerBridgePlugin) { + brokerBridgePlugins.add((ActiveMQServerBridgePlugin) plugin); + } + if (plugin instanceof ActiveMQServerCriticalPlugin) { + brokerCriticalPlugins.add((ActiveMQServerCriticalPlugin) plugin); + } } @Override - public void unRegisterBrokerPlugin(final ActiveMQServerPlugin plugin) { + public void unRegisterBrokerPlugin(final ActiveMQServerBasePlugin plugin) { brokerPlugins.remove(plugin); + if (plugin instanceof ActiveMQServerConnectionPlugin) { + brokerConnectionPlugins.remove(plugin); + } + if (plugin instanceof ActiveMQServerSessionPlugin) { + brokerSessionPlugins.remove(plugin); + } + if (plugin instanceof ActiveMQServerConsumerPlugin) { + brokerConsumerPlugins.remove(plugin); + } + if (plugin instanceof ActiveMQServerAddressPlugin) { + brokerAddressPlugins.remove(plugin); + } + if (plugin instanceof ActiveMQServerQueuePlugin) { + brokerQueuePlugins.remove(plugin); + } + if (plugin instanceof ActiveMQServerBindingPlugin) { + brokerBindingPlugins.remove(plugin); + } + if (plugin instanceof ActiveMQServerMessagePlugin) { + brokerMessagePlugins.remove(plugin); + } + if (plugin instanceof ActiveMQServerBridgePlugin) { + brokerBridgePlugins.remove(plugin); + } + if (plugin instanceof ActiveMQServerCriticalPlugin) { + brokerCriticalPlugins.remove(plugin); + } } @Override - public List<ActiveMQServerPlugin> getBrokerPlugins() { + public List<ActiveMQServerBasePlugin> getBrokerPlugins() { return brokerPlugins; } @Override + public List<ActiveMQServerConnectionPlugin> getBrokerConnectionPlugins() { + return brokerConnectionPlugins; + } + + @Override + public List<ActiveMQServerSessionPlugin> getBrokerSessionPlugins() { + return brokerSessionPlugins; + } + + @Override + public List<ActiveMQServerConsumerPlugin> getBrokerConsumerPlugins() { + return brokerConsumerPlugins; + } + + @Override + public List<ActiveMQServerAddressPlugin> getBrokerAddressPlugins() { + return brokerAddressPlugins; + } + + @Override + public List<ActiveMQServerQueuePlugin> getBrokerQueuePlugins() { + return brokerQueuePlugins; + } + + @Override + public List<ActiveMQServerBindingPlugin> getBrokerBindingPlugins() { + return brokerBindingPlugins; + } + + @Override + public List<ActiveMQServerMessagePlugin> getBrokerMessagePlugins() { + return brokerMessagePlugins; + } + + @Override + public List<ActiveMQServerBridgePlugin> getBrokerBridgePlugins() { + return brokerBridgePlugins; + } + + @Override + public List<ActiveMQServerCriticalPlugin> getBrokerCriticalPlugins() { + return brokerCriticalPlugins; + } + + @Override public File getBrokerInstance() { if (artemisInstance != null) { return artemisInstance; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19e1bbeb/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 247b9ed..9a3e844 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -435,8 +435,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding private boolean internalAddressInfo(AddressInfo addressInfo, boolean reload) throws Exception { synchronized (addressLock) { - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.beforeAddAddress(addressInfo, reload)); + if (server.hasBrokerAddressPlugins()) { + server.callBrokerAddressPlugins(plugin -> plugin.beforeAddAddress(addressInfo, reload)); } boolean result; @@ -451,8 +451,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding if (!addressInfo.isInternal()) { managementService.registerAddress(addressInfo); } - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.afterAddAddress(addressInfo, reload)); + if (server.hasBrokerAddressPlugins()) { + server.callBrokerAddressPlugins(plugin -> plugin.afterAddAddress(addressInfo, reload)); } } catch (Exception e) { e.printStackTrace(); @@ -552,13 +552,13 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding public AddressInfo updateAddressInfo(SimpleString addressName, EnumSet<RoutingType> routingTypes) throws Exception { synchronized (addressLock) { - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.beforeUpdateAddress(addressName, routingTypes)); + if (server.hasBrokerAddressPlugins()) { + server.callBrokerAddressPlugins(plugin -> plugin.beforeUpdateAddress(addressName, routingTypes)); } final AddressInfo address = addressManager.updateAddressInfo(addressName, routingTypes); - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.afterUpdateAddress(address)); + if (server.hasBrokerAddressPlugins()) { + server.callBrokerAddressPlugins(plugin -> plugin.afterUpdateAddress(address)); } return address; @@ -574,8 +574,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding @Override public AddressInfo removeAddressInfo(SimpleString address, boolean force) throws Exception { synchronized (addressLock) { - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.beforeRemoveAddress(address)); + if (server.hasBrokerAddressPlugins()) { + server.callBrokerAddressPlugins(plugin -> plugin.beforeRemoveAddress(address)); } final Bindings bindingsForAddress = getDirectBindings(address); @@ -593,8 +593,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } managementService.unregisterAddress(address); final AddressInfo addressInfo = addressManager.removeAddressInfo(address); - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.afterRemoveAddress(address, addressInfo)); + if (server.hasBrokerAddressPlugins()) { + server.callBrokerAddressPlugins(plugin -> plugin.afterRemoveAddress(address, addressInfo)); } return addressInfo; @@ -628,8 +628,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding // even though failover is complete @Override public synchronized void addBinding(final Binding binding) throws Exception { - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.beforeAddBinding(binding)); + if (server.hasBrokerBindingPlugins()) { + server.callBrokerBindingPlugins(plugin -> plugin.beforeAddBinding(binding)); } addressManager.addBinding(binding); @@ -662,8 +662,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding managementService.sendNotification(new Notification(uid, CoreNotificationType.BINDING_ADDED, props)); - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.afterAddBinding(binding)); + if (server.hasBrokerBindingPlugins()) { + server.callBrokerBindingPlugins(plugin -> plugin.afterAddBinding(binding)); } } @@ -673,8 +673,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding Transaction tx, boolean deleteData) throws Exception { - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.beforeRemoveBinding(uniqueName, tx, deleteData)); + if (server.hasBrokerBindingPlugins()) { + server.callBrokerBindingPlugins(plugin -> plugin.beforeRemoveBinding(uniqueName, tx, deleteData)); } addressSettingsRepository.clearCache(); @@ -722,8 +722,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding binding.close(); - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.afterRemoveBinding(binding, tx, deleteData) ); + if (server.hasBrokerBindingPlugins()) { + server.callBrokerBindingPlugins(plugin -> plugin.afterRemoveBinding(binding, tx, deleteData) ); } return binding; @@ -869,8 +869,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.beforeMessageRoute(message, context, direct, rejectDuplicates)); + if (server.hasBrokerMessagePlugins()) { + server.callBrokerMessagePlugins(plugin -> plugin.beforeMessageRoute(message, context, direct, rejectDuplicates)); } if (logger.isTraceEnabled()) { @@ -935,8 +935,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding context.getTransaction().commit(); } - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, result)); + if (server.hasBrokerMessagePlugins()) { + server.callBrokerMessagePlugins(plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, result)); } return result; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19e1bbeb/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java index 5a1ddef..09d67ad 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java @@ -515,8 +515,8 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif ConnectionEntry entry = protocol.createConnectionEntry((Acceptor) component, connection); try { - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.afterCreateConnection(entry.connection)); + if (server.hasBrokerConnectionPlugins()) { + server.callBrokerConnectionPlugins(plugin -> plugin.afterCreateConnection(entry.connection)); } } catch (ActiveMQException t) { logger.warn("Error executing afterCreateConnection plugin method: {}", t.getMessage(), t); @@ -549,8 +549,8 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif RemotingConnection removedConnection = removeConnection(connectionID); if (removedConnection != null) { try { - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.afterDestroyConnection(removedConnection)); + if (server.hasBrokerConnectionPlugins()) { + server.callBrokerConnectionPlugins(plugin -> plugin.afterDestroyConnection(removedConnection)); } } catch (ActiveMQException t) { logger.warn("Error executing afterDestroyConnection plugin method: {}", t.getMessage(), t); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19e1bbeb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index e15feb4..1883362 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -52,7 +52,16 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.ConnectorsService; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable; -import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBridgePlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConnectionPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerCriticalPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin; import org.apache.activemq.artemis.core.server.reload.ReloadManager; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; @@ -225,18 +234,72 @@ public interface ActiveMQServer extends ServiceComponent { */ void callPostQueueDeletionCallbacks(SimpleString address, SimpleString queueName) throws Exception; - void registerBrokerPlugin(ActiveMQServerPlugin plugin); + void registerBrokerPlugin(ActiveMQServerBasePlugin plugin); - void unRegisterBrokerPlugin(ActiveMQServerPlugin plugin); + void unRegisterBrokerPlugin(ActiveMQServerBasePlugin plugin); - void registerBrokerPlugins(List<ActiveMQServerPlugin> plugins); + void registerBrokerPlugins(List<ActiveMQServerBasePlugin> plugins); - List<ActiveMQServerPlugin> getBrokerPlugins(); + List<ActiveMQServerBasePlugin> getBrokerPlugins(); + + List<ActiveMQServerConnectionPlugin> getBrokerConnectionPlugins(); + + List<ActiveMQServerSessionPlugin> getBrokerSessionPlugins(); + + List<ActiveMQServerConsumerPlugin> getBrokerConsumerPlugins(); + + List<ActiveMQServerAddressPlugin> getBrokerAddressPlugins(); + + List<ActiveMQServerQueuePlugin> getBrokerQueuePlugins(); + + List<ActiveMQServerBindingPlugin> getBrokerBindingPlugins(); + + List<ActiveMQServerMessagePlugin> getBrokerMessagePlugins(); + + List<ActiveMQServerBridgePlugin> getBrokerBridgePlugins(); + + List<ActiveMQServerCriticalPlugin> getBrokerCriticalPlugins(); void callBrokerPlugins(ActiveMQPluginRunnable pluginRun) throws ActiveMQException; + void callBrokerConnectionPlugins(ActiveMQPluginRunnable<ActiveMQServerConnectionPlugin> pluginRun) throws ActiveMQException; + + void callBrokerSessionPlugins(ActiveMQPluginRunnable<ActiveMQServerSessionPlugin> pluginRun) throws ActiveMQException; + + void callBrokerConsumerPlugins(ActiveMQPluginRunnable<ActiveMQServerConsumerPlugin> pluginRun) throws ActiveMQException; + + void callBrokerAddressPlugins(ActiveMQPluginRunnable<ActiveMQServerAddressPlugin> pluginRun) throws ActiveMQException; + + void callBrokerQueuePlugins(ActiveMQPluginRunnable<ActiveMQServerQueuePlugin> pluginRun) throws ActiveMQException; + + void callBrokerBindingPlugins(ActiveMQPluginRunnable<ActiveMQServerBindingPlugin> pluginRun) throws ActiveMQException; + + void callBrokerMessagePlugins(ActiveMQPluginRunnable<ActiveMQServerMessagePlugin> pluginRun) throws ActiveMQException; + + void callBrokerBridgePlugins(ActiveMQPluginRunnable<ActiveMQServerBridgePlugin> pluginRun) throws ActiveMQException; + + void callBrokerCriticalPlugins(ActiveMQPluginRunnable<ActiveMQServerCriticalPlugin> pluginRun) throws ActiveMQException; + boolean hasBrokerPlugins(); + boolean hasBrokerConnectionPlugins(); + + boolean hasBrokerSessionPlugins(); + + boolean hasBrokerConsumerPlugins(); + + boolean hasBrokerAddressPlugins(); + + boolean hasBrokerQueuePlugins(); + + boolean hasBrokerBindingPlugins(); + + boolean hasBrokerMessagePlugins(); + + boolean hasBrokerBridgePlugins(); + + boolean hasBrokerCriticalPlugins(); + void checkQueueCreationLimit(String username) throws Exception; ServerSession createSession(String name, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19e1bbeb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java index 82e7b8d..a354033 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java @@ -406,8 +406,8 @@ public final class ClusterManager implements ActiveMQComponent { return; } - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.beforeDeployBridge(config)); + if (server.hasBrokerBridgePlugins()) { + server.callBrokerBridgePlugins(plugin -> plugin.beforeDeployBridge(config)); } Queue queue = (Queue) binding.getBindable(); @@ -483,8 +483,8 @@ public final class ClusterManager implements ActiveMQComponent { bridge.start(); - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.afterDeployBridge(bridge)); + if (server.hasBrokerBridgePlugins()) { + server.callBrokerBridgePlugins(plugin -> plugin.afterDeployBridge(bridge)); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19e1bbeb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index d2c886b..20b5ac9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -522,8 +522,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled pendingAcks.countDown(); metrics.incrementMessagesAcknowledged(); - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.afterAcknowledgeBridge(this, ref)); + if (server.hasBrokerBridgePlugins()) { + server.callBrokerBridgePlugins(plugin -> plugin.afterAcknowledgeBridge(this, ref)); } } else { if (logger.isTraceEnabled()) { @@ -618,8 +618,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled pendingAcks.countUp(); try { - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.beforeDeliverBridge(this, ref)); + if (server.hasBrokerBridgePlugins()) { + server.callBrokerBridgePlugins(plugin -> plugin.beforeDeliverBridge(this, ref)); } final HandleStatus status; @@ -636,8 +636,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled metrics.incrementMessagesPendingAcknowledgement(); } - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.afterDeliverBridge(this, ref, status)); + if (server.hasBrokerBridgePlugins()) { + server.callBrokerBridgePlugins(plugin -> plugin.afterDeliverBridge(this, ref, status)); } return status; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19e1bbeb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index a71a862..a8e6447 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -149,7 +149,16 @@ import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl; import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable; -import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBridgePlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConnectionPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerCriticalPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin; import org.apache.activemq.artemis.core.server.reload.ReloadCallback; import org.apache.activemq.artemis.core.server.reload.ReloadManager; import org.apache.activemq.artemis.core.server.reload.ReloadManagerImpl; @@ -655,7 +664,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { @Override public void run() { try { - callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.criticalFailure(criticalComponent) : null); + if (hasBrokerCriticalPlugins()) { + callBrokerCriticalPlugins(plugin -> plugin.criticalFailure(criticalComponent)); + } } catch (Throwable e) { logger.warn(e.getMessage(), e); } @@ -1412,14 +1423,17 @@ public class ActiveMQServerImpl implements ActiveMQServer { checkSessionLimit(validatedUser); - callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeCreateSession(name, username, minLargeMessageSize, connection, - autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, autoCreateQueues, context, prefixes) : null); - + if (hasBrokerSessionPlugins()) { + callBrokerSessionPlugins(plugin -> plugin.beforeCreateSession(name, username, minLargeMessageSize, connection, + autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, autoCreateQueues, context, prefixes)); + } final ServerSessionImpl session = internalCreateSession(name, username, password, validatedUser, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues, prefixes); sessions.put(name, session); - callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterCreateSession(session) : null); + if (hasBrokerSessionPlugins()) { + callBrokerSessionPlugins(plugin -> plugin.afterCreateSession(session)); + } return session; } @@ -1898,8 +1912,10 @@ public class ActiveMQServerImpl implements ActiveMQServer { return; } - callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeDestroyQueue(queueName, session, checkConsumerCount, - removeConsumers, autoDeleteAddress) : null); + if (hasBrokerQueuePlugins()) { + callBrokerQueuePlugins(plugin -> plugin.beforeDestroyQueue(queueName, session, checkConsumerCount, + removeConsumers, autoDeleteAddress)); + } addressSettingsRepository.clearCache(); @@ -1930,8 +1946,10 @@ public class ActiveMQServerImpl implements ActiveMQServer { queue.deleteQueue(removeConsumers); - callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterDestroyQueue(queue, address, session, checkConsumerCount, - removeConsumers, autoDeleteAddress) : null); + if (hasBrokerQueuePlugins()) { + callBrokerQueuePlugins(plugin -> plugin.afterDestroyQueue(queue, address, session, checkConsumerCount, + removeConsumers, autoDeleteAddress)); + } AddressInfo addressInfo = getAddressInfo(address); if (autoDeleteAddress && postOffice != null && addressInfo != null && addressInfo.isAutoCreated()) { @@ -2008,32 +2026,126 @@ public class ActiveMQServerImpl implements ActiveMQServer { } @Override - public void registerBrokerPlugins(final List<ActiveMQServerPlugin> plugins) { + public void registerBrokerPlugins(final List<ActiveMQServerBasePlugin> plugins) { configuration.registerBrokerPlugins(plugins); plugins.forEach(plugin -> plugin.registered(this)); } @Override - public void registerBrokerPlugin(final ActiveMQServerPlugin plugin) { + public void registerBrokerPlugin(final ActiveMQServerBasePlugin plugin) { configuration.registerBrokerPlugin(plugin); plugin.registered(this); } @Override - public void unRegisterBrokerPlugin(final ActiveMQServerPlugin plugin) { + public void unRegisterBrokerPlugin(final ActiveMQServerBasePlugin plugin) { configuration.unRegisterBrokerPlugin(plugin); plugin.unregistered(this); } @Override - public List<ActiveMQServerPlugin> getBrokerPlugins() { + public List<ActiveMQServerBasePlugin> getBrokerPlugins() { return configuration.getBrokerPlugins(); } @Override + public List<ActiveMQServerConnectionPlugin> getBrokerConnectionPlugins() { + return configuration.getBrokerConnectionPlugins(); + } + + @Override + public List<ActiveMQServerSessionPlugin> getBrokerSessionPlugins() { + return configuration.getBrokerSessionPlugins(); + } + + @Override + public List<ActiveMQServerConsumerPlugin> getBrokerConsumerPlugins() { + return configuration.getBrokerConsumerPlugins(); + } + + @Override + public List<ActiveMQServerAddressPlugin> getBrokerAddressPlugins() { + return configuration.getBrokerAddressPlugins(); + } + + @Override + public List<ActiveMQServerQueuePlugin> getBrokerQueuePlugins() { + return configuration.getBrokerQueuePlugins(); + } + + @Override + public List<ActiveMQServerBindingPlugin> getBrokerBindingPlugins() { + return configuration.getBrokerBindingPlugins(); + } + + @Override + public List<ActiveMQServerMessagePlugin> getBrokerMessagePlugins() { + return configuration.getBrokerMessagePlugins(); + } + + @Override + public List<ActiveMQServerBridgePlugin> getBrokerBridgePlugins() { + return configuration.getBrokerBridgePlugins(); + } + + @Override + public List<ActiveMQServerCriticalPlugin> getBrokerCriticalPlugins() { + return configuration.getBrokerCriticalPlugins(); + } + + @Override public void callBrokerPlugins(final ActiveMQPluginRunnable pluginRun) throws ActiveMQException { + callBrokerPlugins(getBrokerPlugins(), pluginRun); + } + + @Override + public void callBrokerConnectionPlugins(final ActiveMQPluginRunnable<ActiveMQServerConnectionPlugin> pluginRun) throws ActiveMQException { + callBrokerPlugins(getBrokerConnectionPlugins(), pluginRun); + } + + @Override + public void callBrokerSessionPlugins(final ActiveMQPluginRunnable<ActiveMQServerSessionPlugin> pluginRun) throws ActiveMQException { + callBrokerPlugins(getBrokerSessionPlugins(), pluginRun); + } + + @Override + public void callBrokerConsumerPlugins(final ActiveMQPluginRunnable<ActiveMQServerConsumerPlugin> pluginRun) throws ActiveMQException { + callBrokerPlugins(getBrokerConsumerPlugins(), pluginRun); + } + + @Override + public void callBrokerAddressPlugins(final ActiveMQPluginRunnable<ActiveMQServerAddressPlugin> pluginRun) throws ActiveMQException { + callBrokerPlugins(getBrokerAddressPlugins(), pluginRun); + } + + @Override + public void callBrokerQueuePlugins(final ActiveMQPluginRunnable<ActiveMQServerQueuePlugin> pluginRun) throws ActiveMQException { + callBrokerPlugins(getBrokerQueuePlugins(), pluginRun); + } + + @Override + public void callBrokerBindingPlugins(final ActiveMQPluginRunnable<ActiveMQServerBindingPlugin> pluginRun) throws ActiveMQException { + callBrokerPlugins(getBrokerBindingPlugins(), pluginRun); + } + + @Override + public void callBrokerMessagePlugins(final ActiveMQPluginRunnable<ActiveMQServerMessagePlugin> pluginRun) throws ActiveMQException { + callBrokerPlugins(getBrokerMessagePlugins(), pluginRun); + } + + @Override + public void callBrokerBridgePlugins(final ActiveMQPluginRunnable<ActiveMQServerBridgePlugin> pluginRun) throws ActiveMQException { + callBrokerPlugins(getBrokerBridgePlugins(), pluginRun); + } + + @Override + public void callBrokerCriticalPlugins(final ActiveMQPluginRunnable<ActiveMQServerCriticalPlugin> pluginRun) throws ActiveMQException { + callBrokerPlugins(getBrokerCriticalPlugins(), pluginRun); + } + + private <P extends ActiveMQServerBasePlugin> void callBrokerPlugins(final List<P> plugins, final ActiveMQPluginRunnable<P> pluginRun) throws ActiveMQException { if (pluginRun != null) { - for (ActiveMQServerPlugin plugin : getBrokerPlugins()) { + for (P plugin : plugins) { try { pluginRun.run(plugin); } catch (Throwable e) { @@ -2054,6 +2166,51 @@ public class ActiveMQServerImpl implements ActiveMQServer { } @Override + public boolean hasBrokerConnectionPlugins() { + return !getBrokerConnectionPlugins().isEmpty(); + } + + @Override + public boolean hasBrokerSessionPlugins() { + return !getBrokerSessionPlugins().isEmpty(); + } + + @Override + public boolean hasBrokerConsumerPlugins() { + return !getBrokerConsumerPlugins().isEmpty(); + } + + @Override + public boolean hasBrokerAddressPlugins() { + return !getBrokerAddressPlugins().isEmpty(); + } + + @Override + public boolean hasBrokerQueuePlugins() { + return !getBrokerQueuePlugins().isEmpty(); + } + + @Override + public boolean hasBrokerBindingPlugins() { + return !getBrokerBindingPlugins().isEmpty(); + } + + @Override + public boolean hasBrokerMessagePlugins() { + return !getBrokerMessagePlugins().isEmpty(); + } + + @Override + public boolean hasBrokerBridgePlugins() { + return !getBrokerBridgePlugins().isEmpty(); + } + + @Override + public boolean hasBrokerCriticalPlugins() { + return !getBrokerCriticalPlugins().isEmpty(); + } + + @Override public ExecutorFactory getExecutorFactory() { return executorFactory; } @@ -2854,7 +3011,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { .delayBeforeDispatch(delayBeforeDispatch) .build(); - callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeCreateQueue(queueConfig) : null); + if (hasBrokerQueuePlugins()) { + callBrokerQueuePlugins(plugin -> plugin.beforeCreateQueue(queueConfig)); + } final Queue queue = queueFactory.createQueueWith(queueConfig); @@ -2898,7 +3057,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { managementService.registerQueue(queue, queue.getAddress(), storageManager); } - callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterCreateQueue(queue) : null); + if (hasBrokerQueuePlugins()) { + callBrokerQueuePlugins(plugin -> plugin.afterCreateQueue(queue)); + } callPostQueueCreationCallbacks(queue.getName()); @@ -2978,7 +3139,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { .delayBeforeDispatch(delayBeforeDispatch) .build(); - callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeCreateQueue(queueConfig) : null); + if (hasBrokerQueuePlugins()) { + callBrokerQueuePlugins(plugin -> plugin.beforeCreateQueue(queueConfig)); + } final Queue queue = queueFactory.createQueueWith(queueConfig); @@ -3020,7 +3183,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { managementService.registerQueue(queue, queue.getAddress(), storageManager); - callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterCreateQueue(queue) : null); + if (hasBrokerQueuePlugins()) { + callBrokerQueuePlugins(plugin -> plugin.afterCreateQueue(queue)); + } callPostQueueCreationCallbacks(queue.getName()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19e1bbeb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 0d4c686..8af8aaa 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -1384,8 +1384,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { messagesAcknowledged.incrementAndGet(); } - if (server != null && server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer)); + if (server != null && server.hasBrokerMessagePlugins()) { + server.callBrokerMessagePlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer)); } } @@ -1422,8 +1422,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { messagesAcknowledged.incrementAndGet(); } - if (server != null && server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer)); + if (server != null && server.hasBrokerMessagePlugins()) { + server.callBrokerMessagePlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer)); } } @@ -1514,9 +1514,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { acknowledge(ref, AckReason.EXPIRED, consumer); } - if (server != null && server.hasBrokerPlugins()) { + if (server != null && server.hasBrokerMessagePlugins()) { final SimpleString expiryAddress = messageExpiryAddress; - server.callBrokerPlugins(plugin -> plugin.messageExpired(ref, expiryAddress, consumer)); + server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, expiryAddress, consumer)); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19e1bbeb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 22bfdaf..ddd0b71 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -446,8 +446,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { try { Message message = reference.getMessage(); - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.beforeDeliver(this, reference)); + if (server.hasBrokerMessagePlugins()) { + server.callBrokerMessagePlugins(plugin -> plugin.beforeDeliver(this, reference)); } if (message.isLargeMessage() && supportLargeMessage) { @@ -466,8 +466,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } finally { lockDelivery.readLock().unlock(); callback.afterDelivery(); - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.afterDeliver(this, reference)); + if (server.hasBrokerMessagePlugins()) { + server.callBrokerMessagePlugins(plugin -> plugin.afterDeliver(this, reference)); } } @@ -489,8 +489,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { logger.trace("ServerConsumerImpl::" + this + " being closed with failed=" + failed, new Exception("trace")); } - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.beforeCloseConsumer(this, failed)); + if (server.hasBrokerConsumerPlugins()) { + server.callBrokerConsumerPlugins(plugin -> plugin.beforeCloseConsumer(this, failed)); } setStarted(false); @@ -550,8 +550,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { managementService.sendNotification(notification); } - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.afterCloseConsumer(this, failed)); + if (server.hasBrokerConsumerPlugins()) { + server.callBrokerConsumerPlugins(plugin -> plugin.afterCloseConsumer(this, failed)); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19e1bbeb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index c0bca6b..4910e66 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -356,8 +356,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } synchronized (this) { if (!closed) { - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.beforeCloseSession(this, failed)); + if (server.hasBrokerSessionPlugins()) { + server.callBrokerSessionPlugins(plugin -> plugin.beforeCloseSession(this, failed)); } } this.setStarted(false); @@ -412,8 +412,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener { closed = true; - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.afterCloseSession(this, failed)); + if (server.hasBrokerSessionPlugins()) { + server.callBrokerSessionPlugins(plugin -> plugin.afterCloseSession(this, failed)); } } } @@ -470,16 +470,16 @@ public class ServerSessionImpl implements ServerSession, FailureListener { Filter filter = FilterImpl.createFilter(filterString); - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.beforeCreateConsumer(consumerID, (QueueBinding) binding, + if (server.hasBrokerConsumerPlugins()) { + server.callBrokerConsumerPlugins(plugin -> plugin.beforeCreateConsumer(consumerID, (QueueBinding) binding, filterString, browseOnly, supportLargeMessage)); } ServerConsumer consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding) binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server); consumers.put(consumer.getID(), consumer); - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.afterCreateConsumer(consumer)); + if (server.hasBrokerConsumerPlugins()) { + server.callBrokerConsumerPlugins(plugin -> plugin.afterCreateConsumer(consumer)); } if (!browseOnly) { @@ -1422,8 +1422,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener { message = msg; } - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.beforeSend(this, tx, message, direct, noAutoCreateQueue)); + if (server.hasBrokerMessagePlugins()) { + server.callBrokerMessagePlugins(plugin -> plugin.beforeSend(this, tx, message, direct, noAutoCreateQueue)); } // If the protocol doesn't support flow control, we have no choice other than fail the communication @@ -1470,8 +1470,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener { result = doSend(tx, message, address, direct, noAutoCreateQueue); } - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.afterSend(this, tx, message, direct, noAutoCreateQueue, result)); + if (server.hasBrokerMessagePlugins()) { + server.callBrokerMessagePlugins(plugin -> plugin.afterSend(this, tx, message, direct, noAutoCreateQueue, result)); } return result; @@ -1504,8 +1504,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener { @Override public void addMetaData(String key, String data) throws Exception { - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.beforeSessionMetadataAdded(this, key, data)); + if (server.hasBrokerSessionPlugins()) { + server.callBrokerSessionPlugins(plugin -> plugin.beforeSessionMetadataAdded(this, key, data)); } if (metaData == null) { @@ -1513,8 +1513,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } metaData.put(key, data); - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.afterSessionMetadataAdded(this, key, data)); + if (server.hasBrokerSessionPlugins()) { + server.callBrokerSessionPlugins(plugin -> plugin.afterSessionMetadataAdded(this, key, data)); } } @@ -1523,8 +1523,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener { ServerSession sessionWithMetaData = server.lookupSession(key, data); if (sessionWithMetaData != null && sessionWithMetaData != this) { // There is a duplication of this property - if (server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.duplicateSessionMetadataFailure(this, key, data)); + if (server.hasBrokerSessionPlugins()) { + server.callBrokerSessionPlugins(plugin -> plugin.duplicateSessionMetadataFailure(this, key, data)); } return false; } else { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19e1bbeb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java index 4abe95d..c51a7f3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java @@ -19,8 +19,8 @@ package org.apache.activemq.artemis.core.server.plugin; import org.apache.activemq.artemis.api.core.ActiveMQException; -public interface ActiveMQPluginRunnable { +public interface ActiveMQPluginRunnable<P extends ActiveMQServerBasePlugin> { - void run(ActiveMQServerPlugin plugin) throws ActiveMQException; + void run(P plugin) throws ActiveMQException; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19e1bbeb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerAddressPlugin.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerAddressPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerAddressPlugin.java new file mode 100644 index 0000000..2a431e4 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerAddressPlugin.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.plugin; + +import java.util.EnumSet; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; + +/** + * + */ +public interface ActiveMQServerAddressPlugin extends ActiveMQServerBasePlugin { + + /** + * Before an address is added tot he broker + * + * @param addressInfo The addressInfo that will be added + * @param reload If the address is being reloaded + * @throws ActiveMQException + */ + default void beforeAddAddress(AddressInfo addressInfo, boolean reload) throws ActiveMQException { + + } + + /** + * After an address has been added tot he broker + * + * @param addressInfo The newly added address + * @param reload If the address is being reloaded + * @throws ActiveMQException + */ + default void afterAddAddress(AddressInfo addressInfo, boolean reload) throws ActiveMQException { + + } + + + /** + * Before an address is updated + * + * @param address The existing address info that is about to be updated + * @param routingTypes The new routing types that the address will be updated with + * @throws ActiveMQException + */ + default void beforeUpdateAddress(SimpleString address, EnumSet<RoutingType> routingTypes) throws ActiveMQException { + + } + + /** + * After an address has been updated + * + * @param addressInfo The newly updated address info + * @throws ActiveMQException + */ + default void afterUpdateAddress(AddressInfo addressInfo) throws ActiveMQException { + + } + + /** + * Before an address is removed + * + * @param address The address that will be removed + * @throws ActiveMQException + */ + default void beforeRemoveAddress(SimpleString address) throws ActiveMQException { + + } + + /** + * After an address has been removed + * + * @param address The address that has been removed + * @param addressInfo The address info that has been removed or null if not removed + * @throws ActiveMQException + */ + default void afterRemoveAddress(SimpleString address, AddressInfo addressInfo) throws ActiveMQException { + + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19e1bbeb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerBasePlugin.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerBasePlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerBasePlugin.java new file mode 100644 index 0000000..3f29afc --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerBasePlugin.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.plugin; + +import java.util.Map; +import org.apache.activemq.artemis.core.server.ActiveMQServer; + + +/** + * + */ +public interface ActiveMQServerBasePlugin { + + /** + * used to pass configured properties to Plugin + * + * @param properties + */ + default void init(Map<String, String> properties) { + } + + /** + * The plugin has been registered with the server + * + * @param server The ActiveMQServer the plugin has been registered to + */ + default void registered(ActiveMQServer server) { + } + + /** + * The plugin has been unregistered with the server + * + * @param server The ActiveMQServer the plugin has been unregistered to + */ + default void unregistered(ActiveMQServer server) { + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19e1bbeb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerBindingPlugin.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerBindingPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerBindingPlugin.java new file mode 100644 index 0000000..44efd79 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerBindingPlugin.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.plugin; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.transaction.Transaction; + +/** + * + */ +public interface ActiveMQServerBindingPlugin extends ActiveMQServerBasePlugin { + + /** + * Before a binding is added + * + * @param binding + * @throws ActiveMQException + */ + default void beforeAddBinding(Binding binding) throws ActiveMQException { + + } + + /** + * After a binding has been added + * + * @param binding The newly added binding + * @throws ActiveMQException + */ + default void afterAddBinding(Binding binding) throws ActiveMQException { + + } + + /** + * Before a binding is removed + * + * @param uniqueName + * @param tx + * @param deleteData + * @throws ActiveMQException + */ + default void beforeRemoveBinding(SimpleString uniqueName, Transaction tx, boolean deleteData) throws ActiveMQException { + + } + + /** + * After a binding is removed + * + * @param binding + * @param tx + * @param deleteData + * @throws ActiveMQException + */ + default void afterRemoveBinding(Binding binding, Transaction tx, boolean deleteData) throws ActiveMQException { + + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19e1bbeb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerBridgePlugin.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerBridgePlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerBridgePlugin.java new file mode 100644 index 0000000..ca2493a --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerBridgePlugin.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.plugin; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.core.config.BridgeConfiguration; +import org.apache.activemq.artemis.core.server.HandleStatus; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.cluster.Bridge; + +/** + * + */ +public interface ActiveMQServerBridgePlugin extends ActiveMQServerBasePlugin { + + /** + * Before a bridge is deployed + * + * @param config The bridge configuration + * @throws ActiveMQException + */ + default void beforeDeployBridge(BridgeConfiguration config) throws ActiveMQException { + + } + + /** + * After a bridge has been deployed + * + * @param bridge The newly deployed bridge + * @throws ActiveMQException + */ + default void afterDeployBridge(Bridge bridge) throws ActiveMQException { + + } + + /** + * Called immediately before a bridge delivers a message + * + * @param bridge + * @param ref + * @throws ActiveMQException + */ + default void beforeDeliverBridge(Bridge bridge, MessageReference ref) throws ActiveMQException { + + } + + /** + * Called immediately after a bridge delivers a message but before the message + * is acknowledged + * + * @param bridge + * @param ref + * @param status + * @throws ActiveMQException + */ + default void afterDeliverBridge(Bridge bridge, MessageReference ref, HandleStatus status) throws ActiveMQException { + + } + + /** + * Called after delivered message over this bridge has been acknowledged by the remote broker + * + * @param bridge + * @param ref + * @throws ActiveMQException + */ + default void afterAcknowledgeBridge(Bridge bridge, MessageReference ref) throws ActiveMQException { + + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19e1bbeb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerConnectionPlugin.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerConnectionPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerConnectionPlugin.java new file mode 100644 index 0000000..3489f2d --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerConnectionPlugin.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.plugin; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; + +/** + * + */ +public interface ActiveMQServerConnectionPlugin extends ActiveMQServerBasePlugin { + + + + /** + * A connection has been created. + * + * @param connection The newly created connection + * @throws ActiveMQException + */ + default void afterCreateConnection(RemotingConnection connection) throws ActiveMQException { + + } + + /** + * A connection has been destroyed. + * + * @param connection + * @throws ActiveMQException + */ + default void afterDestroyConnection(RemotingConnection connection) throws ActiveMQException { + + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19e1bbeb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerConsumerPlugin.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerConsumerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerConsumerPlugin.java new file mode 100644 index 0000000..93b9c4f --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerConsumerPlugin.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.plugin; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.QueueBinding; +import org.apache.activemq.artemis.core.server.ServerConsumer; + +/** + * + */ +public interface ActiveMQServerConsumerPlugin extends ActiveMQServerBasePlugin { + + /** + * Before a consumer is created + * + * @param consumerID + * @param queueName + * @param filterString + * @param browseOnly + * @param supportLargeMessage + * @throws ActiveMQException + * + * @deprecated use {@link #beforeCreateConsumer(long, QueueBinding, SimpleString, boolean, boolean) + */ + @Deprecated + default void beforeCreateConsumer(long consumerID, SimpleString queueName, SimpleString filterString, + boolean browseOnly, boolean supportLargeMessage) throws ActiveMQException { + + } + + + /** + * + * Before a consumer is created + * + * @param consumerID + * @param QueueBinding + * @param filterString + * @param browseOnly + * @param supportLargeMessage + * @throws ActiveMQException + */ + default void beforeCreateConsumer(long consumerID, QueueBinding queueBinding, SimpleString filterString, + boolean browseOnly, boolean supportLargeMessage) throws ActiveMQException { + //by default call the old method for backwards compatibility + this.beforeCreateConsumer(consumerID, queueBinding.getQueue().getName(), filterString, browseOnly, supportLargeMessage); + } + + /** + * After a consumer has been created + * + * @param consumer the created consumer + * @throws ActiveMQException + */ + default void afterCreateConsumer(ServerConsumer consumer) throws ActiveMQException { + + } + + /** + * Before a consumer is closed + * + * @param consumer + * @param failed + * @throws ActiveMQException + */ + default void beforeCloseConsumer(ServerConsumer consumer, boolean failed) throws ActiveMQException { + + } + + /** + * After a consumer is closed + * + * @param consumer + * @param failed + * @throws ActiveMQException + */ + default void afterCloseConsumer(ServerConsumer consumer, boolean failed) throws ActiveMQException { + + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19e1bbeb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerCriticalPlugin.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerCriticalPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerCriticalPlugin.java new file mode 100644 index 0000000..0a32c1a --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerCriticalPlugin.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.plugin; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.utils.critical.CriticalComponent; + +/** + * + */ +public interface ActiveMQServerCriticalPlugin extends ActiveMQServerBasePlugin { + + /** + * A Critical failure has been detected. + * This will be called before the broker is stopped + * @param components + * @throws ActiveMQException + */ + default void criticalFailure(CriticalComponent components) throws ActiveMQException { + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19e1bbeb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java new file mode 100644 index 0000000..aef0970 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.plugin; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.RoutingStatus; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.server.ServerSession; +import org.apache.activemq.artemis.core.server.impl.AckReason; +import org.apache.activemq.artemis.core.transaction.Transaction; + +/** + * + */ +public interface ActiveMQServerMessagePlugin extends ActiveMQServerBasePlugin { + + /** + * Before a message is sent + * + * @param session the session that sends the message + * @param tx + * @param message + * @param direct + * @param noAutoCreateQueue + * @throws ActiveMQException + */ + default void beforeSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) throws ActiveMQException { + //by default call the old method for backwards compatibility + this.beforeSend(tx, message, direct, noAutoCreateQueue); + } + + /** + * After a message is sent + * + * @param session the session that sends the message + * @param tx + * @param message + * @param direct + * @param noAutoCreateQueue + * @param result + * @throws ActiveMQException + */ + default void afterSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue, + RoutingStatus result) throws ActiveMQException { + //by default call the old method for backwards compatibility + this.afterSend(tx, message, direct, noAutoCreateQueue, result); + } + + + /** + * Before a message is sent + * + * @param tx + * @param message + * @param direct + * @param noAutoCreateQueue + * @throws ActiveMQException + * + * @deprecated use {@link #beforeSend(ServerSession, Transaction, Message, boolean, boolean)} + */ + @Deprecated + default void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) throws ActiveMQException { + + } + + /** + * After a message is sent + * + * @param tx + * @param message + * @param direct + * @param noAutoCreateQueue + * @param result + * @throws ActiveMQException + * + * @deprecated use {@link #afterSend(ServerSession, Transaction, Message, boolean, boolean, RoutingStatus)} + */ + @Deprecated + default void afterSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue, + RoutingStatus result) throws ActiveMQException { + + } + + /** + * Before a message is routed + * + * @param message + * @param context + * @param direct + * @param rejectDuplicates + * @throws ActiveMQException + */ + default void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws ActiveMQException { + + } + + /** + * After a message is routed + * + * @param message + * @param context + * @param direct + * @param rejectDuplicates + * @param result + * @throws ActiveMQException + */ + default void afterMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates, + RoutingStatus result) throws ActiveMQException { + + } + + /** + * Before a message is delivered to a client consumer + * + * @param consumer the consumer the message will be delivered to + * @param reference message reference + * @throws ActiveMQException + */ + default void beforeDeliver(ServerConsumer consumer, MessageReference reference) throws ActiveMQException { + //by default call the old method for backwards compatibility + this.beforeDeliver(reference); + } + + /** + * After a message is delivered to a client consumer + * + * @param consumer the consumer the message was delivered to + * @param reference message reference + * @throws ActiveMQException + */ + default void afterDeliver(ServerConsumer consumer, MessageReference reference) throws ActiveMQException { + //by default call the old method for backwards compatibility + this.afterDeliver(reference); + } + + /** + * Before a message is delivered to a client consumer + * + * @param reference + * @throws ActiveMQException + * + * @deprecated use throws ActiveMQException {@link #beforeDeliver(ServerConsumer, MessageReference)} + */ + @Deprecated + default void beforeDeliver(MessageReference reference) throws ActiveMQException { + + } + + /** + * After a message is delivered to a client consumer + * + * @param reference + * @throws ActiveMQException + * + * @deprecated use {@link #afterDeliver(ServerConsumer, MessageReference)} + */ + @Deprecated + default void afterDeliver(MessageReference reference) throws ActiveMQException { + + } + + /** + * A message has been expired + * + * @param message The expired message + * @param messageExpiryAddress The message expiry address if exists + * @throws ActiveMQException + * + * @deprecated use {@link #messageExpired(MessageReference, SimpleString, ServerConsumer)} + */ + @Deprecated + default void messageExpired(MessageReference message, SimpleString messageExpiryAddress) throws ActiveMQException { + + } + + /** + * A message has been expired + * + * @param message The expired message + * @param messageExpiryAddress The message expiry address if exists + * @param consumer the Consumer that acknowledged the message - this field is optional + * and can be null + * @throws ActiveMQException + */ + default void messageExpired(MessageReference message, SimpleString messageExpiryAddress, ServerConsumer consumer) throws ActiveMQException { + messageExpired(message, messageExpiryAddress); + } + + /** + * A message has been acknowledged + * + * @param ref The acked message + * @param reason The ack reason + * @throws ActiveMQException + * + * @deprecated use {@link #messageAcknowledged(MessageReference, AckReason, ServerConsumer)} + */ + @Deprecated + default void messageAcknowledged(MessageReference ref, AckReason reason) throws ActiveMQException { + + } + + /** + * A message has been acknowledged + * + * @param ref The acked message + * @param reason The ack reason + * @param consumer the Consumer that acknowledged the message - this field is optional + * and can be null + * @throws ActiveMQException + * + */ + default void messageAcknowledged(MessageReference ref, AckReason reason, ServerConsumer consumer) throws ActiveMQException { + //by default call the old method for backwards compatibility + this.messageAcknowledged(ref, reason); + } +}
