remove JMS JMX Objects and add new Address JMX objects
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ae40a3d3 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ae40a3d3 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ae40a3d3 Branch: refs/heads/ARTEMIS-780 Commit: ae40a3d36546e0cd29948b1cfacb55cf084dffb4 Parents: 63b5a58 Author: Andy Taylor <[email protected]> Authored: Sun Nov 6 10:43:16 2016 +0000 Committer: Clebert Suconic <[email protected]> Committed: Mon Nov 7 11:29:24 2016 -0500 ---------------------------------------------------------------------- .../core/management/ActiveMQServerControl.java | 5 + .../api/core/management/AddressControl.java | 27 + .../api/core/management/ObjectNameBuilder.java | 34 +- .../api/core/management/QueueControl.java | 5 +- .../api/core/management/ResourceNames.java | 5 +- .../impl/JMSConnectionFactoryControlImpl.java | 471 ---------- .../management/impl/JMSQueueControlImpl.java | 532 ----------- .../management/impl/JMSServerControlImpl.java | 876 ------------------- .../management/impl/JMSTopicControlImpl.java | 367 -------- .../openmbean/JMSCompositeDataConstants.java | 57 -- .../impl/openmbean/JMSOpenTypeSupport.java | 357 -------- .../jms/server/impl/JMSServerManagerImpl.java | 30 - .../server/management/JMSManagementService.java | 48 - .../impl/JMSManagementServiceImpl.java | 155 ---- .../impl/ActiveMQServerControlImpl.java | 13 + .../management/impl/AddressControlImpl.java | 105 ++- .../core/management/impl/QueueControlImpl.java | 12 +- .../core/postoffice/impl/PostOfficeImpl.java | 4 +- .../core/server/impl/ActiveMQServerImpl.java | 3 - .../artemis/core/server/impl/AddressInfo.java | 7 + .../server/impl/PostOfficeJournalLoader.java | 4 +- .../server/management/ManagementService.java | 3 +- .../management/impl/ManagementServiceImpl.java | 15 +- .../group/impl/ClusteredResetMockTest.java | 3 +- .../tests/extras/jms/bridge/BridgeTestBase.java | 6 +- .../crossprotocol/AMQPToOpenwireTest.java | 1 - .../management/ActiveMQServerControlTest.java | 12 +- .../ActiveMQServerControlUsingCoreTest.java | 5 + .../management/DivertControlTest.java | 8 +- .../management/DivertControlUsingCoreTest.java | 2 +- .../management/ManagementControlHelper.java | 4 +- .../management/ManagementServiceImplTest.java | 3 +- .../management/QueueControlTest.java | 4 +- .../management/QueueControlUsingCoreTest.java | 14 +- .../tests/tools/container/LocalTestServer.java | 17 +- .../activemq/artemis/common/AbstractAdmin.java | 10 +- .../activemq/artemis/jms/ActiveMQCoreAdmin.java | 3 - 37 files changed, 237 insertions(+), 2990 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae40a3d3/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java index 87a4a79..30e8bc5 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java @@ -424,6 +424,11 @@ public interface ActiveMQServerControl { // Operations ---------------------------------------------------- + @Operation(desc = "create an address", impact = MBeanOperationInfo.ACTION) + void createAddress(@Parameter(name = "name", desc = "The name of the address") String name, + @Parameter(name = "routingType", desc = "the routing type of the address either 0 for multicast or 1 for anycast") int routingType, + @Parameter(name = "defaultDeleteOnNoConsumers", desc = "Whether or not a queue with this address is deleted when it has no consumers") boolean defaultDeleteOnNoConsumers, + @Parameter(name = "defaultMaxConsumers", desc = "The maximim number of consumer a queue with this address can have") int defaultMaxConsumers) throws Exception; /** * Create a durable queue. * <br> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae40a3d3/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java index fbecf25..e7a02ad 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java @@ -16,6 +16,9 @@ */ package org.apache.activemq.artemis.api.core.management; +import javax.management.MBeanOperationInfo; +import java.util.Map; + /** * An AddressControl is used to manage an address. */ @@ -27,6 +30,12 @@ public interface AddressControl { @Attribute(desc = "managed address") String getAddress(); + /* + * The routing type of this address, either multicast (topic subscriptions) or anycast (queue semantics). + * */ + @Attribute(desc = "The routing type of this address") + String getRoutingType(); + /** * Returns the roles (name and permissions) associated with this address. */ @@ -85,4 +94,22 @@ public interface AddressControl { */ @Attribute(desc = "names of all bindings (both queues and diverts) bound to this address") String[] getBindingNames() throws Exception; + + + /** + * @param headers the message headers and properties to set. Can only + * container Strings maped to primitive types. + * @param body the text to send + * @param durable + * @param user + * @param password @return + * @throws Exception + */ + @Operation(desc = "Sends a TextMessage to a password-protected address.", impact = MBeanOperationInfo.ACTION) + String sendMessage(@Parameter(name = "headers", desc = "The headers to add to the message") Map<String, String> headers, + @Parameter(name = "headers", desc = "A type for the message") final int type, + @Parameter(name = "body", desc = "The body (byte[]) of the message encoded as a string using Base64") String body, + @Parameter(name = "durable", desc = "Whether the message is durable") boolean durable, + @Parameter(name = "user", desc = "The user to authenticate with") String user, + @Parameter(name = "password", desc = "The users password to authenticate with") String password) throws Exception; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae40a3d3/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ObjectNameBuilder.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ObjectNameBuilder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ObjectNameBuilder.java index ef7b483..019996a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ObjectNameBuilder.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ObjectNameBuilder.java @@ -33,10 +33,6 @@ public final class ObjectNameBuilder { */ public static final ObjectNameBuilder DEFAULT = new ObjectNameBuilder(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "localhost", true); - static final String JMS_MODULE = "JMS"; - - static final String CORE_MODULE = "Core"; - // Attributes ---------------------------------------------------- private final String domain; @@ -85,7 +81,7 @@ public final class ObjectNameBuilder { * Returns the ObjectName used by the single {@link ActiveMQServerControl}. */ public ObjectName getActiveMQServerObjectName() throws Exception { - return ObjectName.getInstance(domain + ":" + getBrokerProperties() + "module=Core," + getObjectType() + "=Server"); + return ObjectName.getInstance(domain + ":" + getBrokerProperties() + getObjectType() + "=Broker"); } /** @@ -94,7 +90,7 @@ public final class ObjectNameBuilder { * @see AddressControl */ public ObjectName getAddressObjectName(final SimpleString address) throws Exception { - return createObjectName(ObjectNameBuilder.CORE_MODULE, "Address", address.toString()); + return createObjectName("Address", address.toString()); } /** @@ -103,7 +99,7 @@ public final class ObjectNameBuilder { * @see QueueControl */ public ObjectName getQueueObjectName(final SimpleString address, final SimpleString name) throws Exception { - return ObjectName.getInstance(String.format("%s:" + getBrokerProperties() + "module=%s," + getObjectType() + "=%s,address=%s,name=%s", domain, ObjectNameBuilder.CORE_MODULE, "Queue", ObjectName.quote(address.toString()), ObjectName.quote(name.toString()))); + return ObjectName.getInstance(String.format("%s:" + getBrokerProperties() + "parentType=%s,parentName=%s," + getObjectType() + "=%s,name=%s", domain, "Address", ObjectName.quote(address.toString()), "Queue", ObjectName.quote(name.toString()))); } /** @@ -111,8 +107,8 @@ public final class ObjectNameBuilder { * * @see DivertControl */ - public ObjectName getDivertObjectName(final String name) throws Exception { - return createObjectName(ObjectNameBuilder.CORE_MODULE, "Divert", name); + public ObjectName getDivertObjectName(final String name, String address) throws Exception { + return ObjectName.getInstance(String.format("%s:" + getBrokerProperties() + "parentType=%s,parentName=%s," + getObjectType() + "=%s,name=%s", domain, "Address", ObjectName.quote(address.toString()), "Divert", ObjectName.quote(name.toString()))); } /** @@ -121,7 +117,7 @@ public final class ObjectNameBuilder { * @see AcceptorControl */ public ObjectName getAcceptorObjectName(final String name) throws Exception { - return createObjectName(ObjectNameBuilder.CORE_MODULE, "Acceptor", name); + return createObjectName("Acceptor", name); } /** @@ -130,7 +126,7 @@ public final class ObjectNameBuilder { * @see BroadcastGroupControl */ public ObjectName getBroadcastGroupObjectName(final String name) throws Exception { - return createObjectName(ObjectNameBuilder.CORE_MODULE, "BroadcastGroup", name); + return createObjectName("BroadcastGroup", name); } /** @@ -139,7 +135,7 @@ public final class ObjectNameBuilder { * @see BridgeControl */ public ObjectName getBridgeObjectName(final String name) throws Exception { - return createObjectName(ObjectNameBuilder.CORE_MODULE, "Bridge", name); + return createObjectName("Bridge", name); } /** @@ -148,14 +144,14 @@ public final class ObjectNameBuilder { * @see ClusterConnectionControl */ public ObjectName getClusterConnectionObjectName(final String name) throws Exception { - return createObjectName(ObjectNameBuilder.CORE_MODULE, "ClusterConnection", name); + return createObjectName("ClusterConnection", name); } /** * Returns the ObjectName used by DiscoveryGroupControl. */ public ObjectName getDiscoveryGroupObjectName(final String name) throws Exception { - return createObjectName(ObjectNameBuilder.CORE_MODULE, "DiscoveryGroup", name); + return createObjectName("DiscoveryGroup", name); } /** @@ -169,25 +165,25 @@ public final class ObjectNameBuilder { * Returns the ObjectName used by JMSQueueControl. */ public ObjectName getJMSQueueObjectName(final String name) throws Exception { - return createObjectName(ObjectNameBuilder.JMS_MODULE, "Queue", name); + return createObjectName("Queue", name); } /** * Returns the ObjectName used by TopicControl. */ public ObjectName getJMSTopicObjectName(final String name) throws Exception { - return createObjectName(ObjectNameBuilder.JMS_MODULE, "Topic", name); + return createObjectName("Topic", name); } /** * Returns the ObjectName used by ConnectionFactoryControl. */ public ObjectName getConnectionFactoryObjectName(final String name) throws Exception { - return createObjectName(ObjectNameBuilder.JMS_MODULE, "ConnectionFactory", name); + return createObjectName("ConnectionFactory", name); } - private ObjectName createObjectName(final String module, final String type, final String name) throws Exception { - String format = String.format("%s:" + getBrokerProperties() + "module=%s," + getObjectType() + "=%s,name=%s", domain, module, type, ObjectName.quote(name)); + private ObjectName createObjectName(final String type, final String name) throws Exception { + String format = String.format("%s:" + getBrokerProperties() + getObjectType() + "=%s,name=%s", domain, type, ObjectName.quote(name)); return ObjectName.getInstance(format); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae40a3d3/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java index 3336aae..bbf365c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java @@ -338,7 +338,6 @@ public interface QueueControl { * @param headers the message headers and properties to set. Can only * container Strings maped to primitive types. * @param body the text to send - * @param userID * @param durable * @param user * @param password @return @@ -348,7 +347,6 @@ public interface QueueControl { String sendMessage(@Parameter(name = "headers", desc = "The headers to add to the message") Map<String, String> headers, @Parameter(name = "headers", desc = "A type for the message") final int type, @Parameter(name = "body", desc = "The body (byte[]) of the message encoded as a string using Base64") String body, - @Parameter(name = "body", desc = "The user ID to set on the message") String userID, @Parameter(name = "durable", desc = "Whether the message is durable") boolean durable, @Parameter(name = "user", desc = "The user to authenticate with") String user, @Parameter(name = "password", desc = "The users password to authenticate with") String password) throws Exception; @@ -431,6 +429,9 @@ public interface QueueControl { @Attribute(desc = "whether the queue is paused") boolean isPaused() throws Exception; + @Operation(desc = "Browse Messages", impact = MBeanOperationInfo.ACTION) + CompositeData[] browse() throws Exception; + /** * Resets the MessagesAdded property */ http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae40a3d3/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ResourceNames.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ResourceNames.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ResourceNames.java index a8c7632..716c6c1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ResourceNames.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ResourceNames.java @@ -23,10 +23,11 @@ package org.apache.activemq.artemis.api.core.management; * For example, the resource name of the "foo" queue is {@code CORE_QUEUE + "foo"}. */ public final class ResourceNames { + public static final String ADDRESS = "address."; - public static final String CORE_SERVER = "core.server"; + public static final String CORE_SERVER = "broker"; - public static final String CORE_QUEUE = "core.queue."; + public static final String CORE_QUEUE = "queue."; public static final String CORE_ADDRESS = "core.address."; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae40a3d3/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSConnectionFactoryControlImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSConnectionFactoryControlImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSConnectionFactoryControlImpl.java deleted file mode 100644 index 3175b9c..0000000 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSConnectionFactoryControlImpl.java +++ /dev/null @@ -1,471 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.jms.management.impl; - -import javax.management.MBeanInfo; -import javax.management.NotCompliantMBeanException; -import javax.management.StandardMBean; - -import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; -import org.apache.activemq.artemis.api.core.TransportConfiguration; -import org.apache.activemq.artemis.api.core.management.Parameter; -import org.apache.activemq.artemis.api.jms.management.ConnectionFactoryControl; -import org.apache.activemq.artemis.core.management.impl.MBeanInfoHelper; -import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; -import org.apache.activemq.artemis.jms.server.JMSServerManager; -import org.apache.activemq.artemis.jms.server.config.ConnectionFactoryConfiguration; - -public class JMSConnectionFactoryControlImpl extends StandardMBean implements ConnectionFactoryControl { - // Constants ----------------------------------------------------- - - // Attributes ---------------------------------------------------- - - private final ConnectionFactoryConfiguration cfConfig; - - private ActiveMQConnectionFactory cf; - - private final String name; - - private final JMSServerManager jmsManager; - - // Static -------------------------------------------------------- - - // Constructors -------------------------------------------------- - - public JMSConnectionFactoryControlImpl(final ConnectionFactoryConfiguration cfConfig, - final ActiveMQConnectionFactory cf, - final JMSServerManager jmsManager, - final String name) throws NotCompliantMBeanException { - super(ConnectionFactoryControl.class); - this.cfConfig = cfConfig; - this.cf = cf; - this.name = name; - this.jmsManager = jmsManager; - } - - // Public -------------------------------------------------------- - - // ManagedConnectionFactoryMBean implementation ------------------ - - @Override - public String[] getRegistryBindings() { - return jmsManager.getBindingsOnConnectionFactory(name); - } - - @Override - public boolean isCompressLargeMessages() { - return cf.isCompressLargeMessage(); - } - - @Override - public void setCompressLargeMessages(final boolean compress) { - cfConfig.setCompressLargeMessages(compress); - recreateCF(); - } - - @Override - public boolean isHA() { - return cfConfig.isHA(); - } - - @Override - public int getFactoryType() { - return cfConfig.getFactoryType().intValue(); - } - - @Override - public String getClientID() { - return cfConfig.getClientID(); - } - - @Override - public long getClientFailureCheckPeriod() { - return cfConfig.getClientFailureCheckPeriod(); - } - - @Override - public void setClientID(String clientID) { - cfConfig.setClientID(clientID); - recreateCF(); - } - - @Override - public void setDupsOKBatchSize(int dupsOKBatchSize) { - cfConfig.setDupsOKBatchSize(dupsOKBatchSize); - recreateCF(); - } - - @Override - public void setTransactionBatchSize(int transactionBatchSize) { - cfConfig.setTransactionBatchSize(transactionBatchSize); - recreateCF(); - } - - @Override - public void setClientFailureCheckPeriod(long clientFailureCheckPeriod) { - cfConfig.setClientFailureCheckPeriod(clientFailureCheckPeriod); - recreateCF(); - } - - @Override - public void setConnectionTTL(long connectionTTL) { - cfConfig.setConnectionTTL(connectionTTL); - recreateCF(); - } - - @Override - public void setCallTimeout(long callTimeout) { - cfConfig.setCallTimeout(callTimeout); - recreateCF(); - } - - @Override - public void setCallFailoverTimeout(long callTimeout) { - cfConfig.setCallFailoverTimeout(callTimeout); - recreateCF(); - } - - @Override - public void setConsumerWindowSize(int consumerWindowSize) { - cfConfig.setConsumerWindowSize(consumerWindowSize); - recreateCF(); - } - - @Override - public void setConsumerMaxRate(int consumerMaxRate) { - cfConfig.setConsumerMaxRate(consumerMaxRate); - recreateCF(); - } - - @Override - public void setConfirmationWindowSize(int confirmationWindowSize) { - cfConfig.setConfirmationWindowSize(confirmationWindowSize); - recreateCF(); - } - - @Override - public void setProducerMaxRate(int producerMaxRate) { - cfConfig.setProducerMaxRate(producerMaxRate); - recreateCF(); - } - - @Override - public int getProducerWindowSize() { - return cfConfig.getProducerWindowSize(); - } - - @Override - public void setProducerWindowSize(int producerWindowSize) { - cfConfig.setProducerWindowSize(producerWindowSize); - recreateCF(); - } - - @Override - public void setCacheLargeMessagesClient(boolean cacheLargeMessagesClient) { - cfConfig.setCacheLargeMessagesClient(cacheLargeMessagesClient); - recreateCF(); - } - - @Override - public boolean isCacheLargeMessagesClient() { - return cfConfig.isCacheLargeMessagesClient(); - } - - @Override - public void setMinLargeMessageSize(int minLargeMessageSize) { - cfConfig.setMinLargeMessageSize(minLargeMessageSize); - recreateCF(); - } - - @Override - public void setBlockOnNonDurableSend(boolean blockOnNonDurableSend) { - cfConfig.setBlockOnNonDurableSend(blockOnNonDurableSend); - recreateCF(); - } - - @Override - public void setBlockOnAcknowledge(boolean blockOnAcknowledge) { - cfConfig.setBlockOnAcknowledge(blockOnAcknowledge); - recreateCF(); - } - - @Override - public void setBlockOnDurableSend(boolean blockOnDurableSend) { - cfConfig.setBlockOnDurableSend(blockOnDurableSend); - recreateCF(); - } - - @Override - public void setAutoGroup(boolean autoGroup) { - cfConfig.setAutoGroup(autoGroup); - recreateCF(); - } - - @Override - public void setPreAcknowledge(boolean preAcknowledge) { - cfConfig.setPreAcknowledge(preAcknowledge); - recreateCF(); - } - - @Override - public void setMaxRetryInterval(long retryInterval) { - cfConfig.setMaxRetryInterval(retryInterval); - recreateCF(); - } - - @Override - public void setRetryIntervalMultiplier(double retryIntervalMultiplier) { - cfConfig.setRetryIntervalMultiplier(retryIntervalMultiplier); - recreateCF(); - } - - @Override - public void setReconnectAttempts(int reconnectAttempts) { - cfConfig.setReconnectAttempts(reconnectAttempts); - recreateCF(); - } - - @Override - public void setFailoverOnInitialConnection(boolean failover) { - cfConfig.setFailoverOnInitialConnection(failover); - recreateCF(); - } - - @Override - public boolean isUseGlobalPools() { - return cfConfig.isUseGlobalPools(); - } - - @Override - public void setScheduledThreadPoolMaxSize(int scheduledThreadPoolMaxSize) { - cfConfig.setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize); - recreateCF(); - } - - @Override - public int getThreadPoolMaxSize() { - return cfConfig.getThreadPoolMaxSize(); - } - - @Override - public void setThreadPoolMaxSize(int threadPoolMaxSize) { - cfConfig.setThreadPoolMaxSize(threadPoolMaxSize); - recreateCF(); - } - - @Override - public int getInitialMessagePacketSize() { - return cf.getInitialMessagePacketSize(); - } - - @Override - public void setGroupID(String groupID) { - cfConfig.setGroupID(groupID); - recreateCF(); - } - - @Override - public String getGroupID() { - return cfConfig.getGroupID(); - } - - @Override - public void setUseGlobalPools(boolean useGlobalPools) { - cfConfig.setUseGlobalPools(useGlobalPools); - recreateCF(); - } - - @Override - public int getScheduledThreadPoolMaxSize() { - return cfConfig.getScheduledThreadPoolMaxSize(); - } - - @Override - public void setRetryInterval(long retryInterval) { - cfConfig.setRetryInterval(retryInterval); - recreateCF(); - } - - @Override - public long getMaxRetryInterval() { - return cfConfig.getMaxRetryInterval(); - } - - @Override - public String getConnectionLoadBalancingPolicyClassName() { - return cfConfig.getLoadBalancingPolicyClassName(); - } - - @Override - public void setConnectionLoadBalancingPolicyClassName(String name) { - cfConfig.setLoadBalancingPolicyClassName(name); - recreateCF(); - } - - @Override - public TransportConfiguration[] getStaticConnectors() { - return cf.getStaticConnectors(); - } - - @Override - public DiscoveryGroupConfiguration getDiscoveryGroupConfiguration() { - return cf.getDiscoveryGroupConfiguration(); - } - - @Override - public void addBinding(@Parameter(name = "binding", desc = "the name of the binding for the Registry") String binding) throws Exception { - jmsManager.addConnectionFactoryToBindingRegistry(name, binding); - } - - @Override - public void removeBinding(@Parameter(name = "binding", desc = "the name of the binding for the Registry") String binding) throws Exception { - jmsManager.removeConnectionFactoryFromBindingRegistry(name, binding); - } - - @Override - public long getCallTimeout() { - return cfConfig.getCallTimeout(); - } - - @Override - public long getCallFailoverTimeout() { - return cfConfig.getCallFailoverTimeout(); - } - - @Override - public int getConsumerMaxRate() { - return cfConfig.getConsumerMaxRate(); - } - - @Override - public int getConsumerWindowSize() { - return cfConfig.getConsumerWindowSize(); - } - - @Override - public int getProducerMaxRate() { - return cfConfig.getProducerMaxRate(); - } - - @Override - public int getConfirmationWindowSize() { - return cfConfig.getConfirmationWindowSize(); - } - - @Override - public int getDupsOKBatchSize() { - return cfConfig.getDupsOKBatchSize(); - } - - @Override - public boolean isBlockOnAcknowledge() { - return cfConfig.isBlockOnAcknowledge(); - } - - @Override - public boolean isBlockOnNonDurableSend() { - return cfConfig.isBlockOnNonDurableSend(); - } - - @Override - public boolean isBlockOnDurableSend() { - return cfConfig.isBlockOnDurableSend(); - } - - @Override - public boolean isPreAcknowledge() { - return cfConfig.isPreAcknowledge(); - } - - @Override - public String getName() { - return name; - } - - @Override - public long getConnectionTTL() { - return cfConfig.getConnectionTTL(); - } - - @Override - public int getReconnectAttempts() { - return cfConfig.getReconnectAttempts(); - } - - @Override - public boolean isFailoverOnInitialConnection() { - return cfConfig.isFailoverOnInitialConnection(); - } - - @Override - public int getMinLargeMessageSize() { - return cfConfig.getMinLargeMessageSize(); - } - - @Override - public long getRetryInterval() { - return cfConfig.getRetryInterval(); - } - - @Override - public double getRetryIntervalMultiplier() { - return cfConfig.getRetryIntervalMultiplier(); - } - - @Override - public int getTransactionBatchSize() { - return cfConfig.getTransactionBatchSize(); - } - - @Override - public void setProtocolManagerFactoryStr(String protocolManagerFactoryStr) { - cfConfig.setProtocolManagerFactoryStr(protocolManagerFactoryStr); - recreateCF(); - } - - @Override - public String getProtocolManagerFactoryStr() { - return cfConfig.getProtocolManagerFactoryStr(); - } - - @Override - public boolean isAutoGroup() { - return cfConfig.isAutoGroup(); - } - - @Override - public MBeanInfo getMBeanInfo() { - MBeanInfo info = super.getMBeanInfo(); - return new MBeanInfo(info.getClassName(), info.getDescription(), info.getAttributes(), info.getConstructors(), MBeanInfoHelper.getMBeanOperationsInfo(ConnectionFactoryControl.class), info.getNotifications()); - } - - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - - private void recreateCF() { - try { - this.cf = jmsManager.recreateCF(this.name, this.cfConfig); - } catch (Exception e) { - throw new RuntimeException(e.getMessage(), e); - } - } - - // Private ------------------------------------------------------- - - // Inner classes ------------------------------------------------- -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae40a3d3/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java deleted file mode 100644 index 36cba96..0000000 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java +++ /dev/null @@ -1,532 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.jms.management.impl; - -import javax.jms.InvalidSelectorException; -import javax.json.JsonArrayBuilder; -import javax.management.MBeanInfo; -import javax.management.StandardMBean; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.CompositeDataSupport; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQBuffers; -import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.api.core.ActiveMQInvalidFilterExpressionException; -import org.apache.activemq.artemis.api.core.FilterConstants; -import org.apache.activemq.artemis.api.core.JsonUtil; -import org.apache.activemq.artemis.api.core.Message; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.api.core.management.MessageCounterInfo; -import org.apache.activemq.artemis.api.core.management.Operation; -import org.apache.activemq.artemis.api.core.management.QueueControl; -import org.apache.activemq.artemis.api.jms.management.JMSQueueControl; -import org.apache.activemq.artemis.core.management.impl.MBeanInfoHelper; -import org.apache.activemq.artemis.core.messagecounter.MessageCounter; -import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper; -import org.apache.activemq.artemis.jms.client.ActiveMQDestination; -import org.apache.activemq.artemis.jms.client.ActiveMQMessage; -import org.apache.activemq.artemis.jms.management.impl.openmbean.JMSOpenTypeSupport; -import org.apache.activemq.artemis.jms.server.JMSServerManager; -import org.apache.activemq.artemis.utils.Base64; -import org.apache.activemq.artemis.utils.JsonLoader; -import org.apache.activemq.artemis.utils.SelectorTranslator; -import org.apache.activemq.artemis.utils.UUIDGenerator; - -public class JMSQueueControlImpl extends StandardMBean implements JMSQueueControl { - - private final ActiveMQDestination managedQueue; - - private final JMSServerManager jmsServerManager; - - private final QueueControl coreQueueControl; - - private final MessageCounter counter; - - // Static -------------------------------------------------------- - - /** - * Returns null if the string is null or empty - */ - public static String createFilterFromJMSSelector(final String selectorStr) throws ActiveMQException { - return selectorStr == null || selectorStr.trim().length() == 0 ? null : SelectorTranslator.convertToActiveMQFilterString(selectorStr); - } - - private static String createFilterForJMSMessageID(final String jmsMessageID) throws Exception { - return FilterConstants.ACTIVEMQ_USERID + " = '" + jmsMessageID + "'"; - } - - static String toJSON(final Map<String, Object>[] messages) { - JsonArrayBuilder array = JsonLoader.createArrayBuilder(); - for (Map<String, Object> message : messages) { - array.add(JsonUtil.toJsonObject(message)); - } - return array.build().toString(); - } - - // Constructors -------------------------------------------------- - - public JMSQueueControlImpl(final ActiveMQDestination managedQueue, - final QueueControl coreQueueControl, - final JMSServerManager jmsServerManager, - final MessageCounter counter) throws Exception { - super(JMSQueueControl.class); - this.managedQueue = managedQueue; - this.jmsServerManager = jmsServerManager; - this.coreQueueControl = coreQueueControl; - this.counter = counter; - } - - // Public -------------------------------------------------------- - - // ManagedJMSQueueMBean implementation --------------------------- - - @Override - public String getName() { - return managedQueue.getName(); - } - - @Override - public String getAddress() { - return managedQueue.getAddress(); - } - - @Override - public boolean isTemporary() { - return managedQueue.isTemporary(); - } - - @Override - public long getMessageCount() { - return coreQueueControl.getMessageCount(); - } - - @Override - public long getMessagesAdded() { - return coreQueueControl.getMessagesAdded(); - } - - @Override - public long getMessagesExpired() { - return coreQueueControl.getMessagesExpired(); - } - - @Override - public long getMessagesKilled() { - return coreQueueControl.getMessagesKilled(); - } - - @Override - public int getConsumerCount() { - return coreQueueControl.getConsumerCount(); - } - - @Override - public int getDeliveringCount() { - return coreQueueControl.getDeliveringCount(); - } - - @Override - public long getScheduledCount() { - return coreQueueControl.getScheduledCount(); - } - - public boolean isDurable() { - return coreQueueControl.isDurable(); - } - - @Override - public String getDeadLetterAddress() { - return coreQueueControl.getDeadLetterAddress(); - } - - @Override - public String getExpiryAddress() { - return coreQueueControl.getExpiryAddress(); - } - - @Override - public String getFirstMessageAsJSON() throws Exception { - return coreQueueControl.getFirstMessageAsJSON(); - } - - @Override - public Long getFirstMessageTimestamp() throws Exception { - return coreQueueControl.getFirstMessageTimestamp(); - } - - @Override - public Long getFirstMessageAge() throws Exception { - return coreQueueControl.getFirstMessageAge(); - } - - @Override - public void addBinding(String binding) throws Exception { - jmsServerManager.addQueueToBindingRegistry(managedQueue.getName(), binding); - } - - @Override - public String[] getRegistryBindings() { - return jmsServerManager.getBindingsOnQueue(managedQueue.getName()); - } - - @Override - public boolean removeMessage(final String messageID) throws Exception { - String filter = JMSQueueControlImpl.createFilterForJMSMessageID(messageID); - int removed = coreQueueControl.removeMessages(filter); - if (removed != 1) { - throw new IllegalArgumentException("No message found for JMSMessageID: " + messageID); - } - return true; - } - - @Override - public int removeMessages(final String filterStr) throws Exception { - String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr); - return coreQueueControl.removeMessages(filter); - } - - @Override - public Map<String, Object>[] listMessages(final String filterStr) throws Exception { - try { - String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr); - Map<String, Object>[] coreMessages = coreQueueControl.listMessages(filter); - - return toJMSMap(coreMessages); - } catch (ActiveMQException e) { - throw new IllegalStateException(e.getMessage()); - } - } - - private Map<String, Object>[] toJMSMap(Map<String, Object>[] coreMessages) { - Map<String, Object>[] jmsMessages = new Map[coreMessages.length]; - - int i = 0; - - for (Map<String, Object> coreMessage : coreMessages) { - Map<String, Object> jmsMessage = ActiveMQMessage.coreMaptoJMSMap(coreMessage); - jmsMessages[i++] = jmsMessage; - } - return jmsMessages; - } - - private CompositeData toJMSCompositeType(CompositeDataSupport data) throws Exception { - return JMSOpenTypeSupport.convert(data); - } - - @Override - public Map<String, Object>[] listScheduledMessages() throws Exception { - Map<String, Object>[] coreMessages = coreQueueControl.listScheduledMessages(); - - return toJMSMap(coreMessages); - } - - @Override - public String listScheduledMessagesAsJSON() throws Exception { - return coreQueueControl.listScheduledMessagesAsJSON(); - } - - @Override - public Map<String, Map<String, Object>[]> listDeliveringMessages() throws Exception { - try { - Map<String, Map<String, Object>[]> returnMap = new HashMap<>(); - - // the workingMap from the queue-control - Map<String, Map<String, Object>[]> workingMap = coreQueueControl.listDeliveringMessages(); - - for (Map.Entry<String, Map<String, Object>[]> entry : workingMap.entrySet()) { - returnMap.put(entry.getKey(), toJMSMap(entry.getValue())); - } - - return returnMap; - } catch (ActiveMQException e) { - throw new IllegalStateException(e.getMessage()); - } - } - - @Override - public String listDeliveringMessagesAsJSON() throws Exception { - return coreQueueControl.listDeliveringMessagesAsJSON(); - } - - @Override - public String listMessagesAsJSON(final String filter) throws Exception { - return JMSQueueControlImpl.toJSON(listMessages(filter)); - } - - @Override - public long countMessages(final String filterStr) throws Exception { - String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr); - return coreQueueControl.countMessages(filter); - } - - @Override - public boolean expireMessage(final String messageID) throws Exception { - String filter = JMSQueueControlImpl.createFilterForJMSMessageID(messageID); - int expired = coreQueueControl.expireMessages(filter); - if (expired != 1) { - throw new IllegalArgumentException("No message found for JMSMessageID: " + messageID); - } - return true; - } - - @Override - public int expireMessages(final String filterStr) throws Exception { - String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr); - return coreQueueControl.expireMessages(filter); - } - - @Override - public boolean sendMessageToDeadLetterAddress(final String messageID) throws Exception { - String filter = JMSQueueControlImpl.createFilterForJMSMessageID(messageID); - int dead = coreQueueControl.sendMessagesToDeadLetterAddress(filter); - if (dead != 1) { - throw new IllegalArgumentException("No message found for JMSMessageID: " + messageID); - } - return true; - } - - @Override - public int sendMessagesToDeadLetterAddress(final String filterStr) throws Exception { - String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr); - return coreQueueControl.sendMessagesToDeadLetterAddress(filter); - } - - @Override - public String sendTextMessageWithProperties(String properties) throws Exception { - String[] kvs = properties.split(","); - Map<String, String> props = new HashMap<>(); - for (String kv : kvs) { - String[] it = kv.split("="); - if (it.length == 2) { - props.put(it[0], it[1]); - } - } - return sendTextMessage(props, props.remove("body"), props.remove("username"), props.remove("password")); - } - - @Override - public String sendTextMessage(String body) throws Exception { - return sendTextMessage(Collections.EMPTY_MAP, body); - } - - @Override - public String sendTextMessage(Map<String, String> headers, String body) throws Exception { - return sendTextMessage(headers, body, null, null); - } - - @Override - public String sendTextMessage(String body, String user, String password) throws Exception { - return sendTextMessage(Collections.EMPTY_MAP, body, user, password); - } - - @Override - public String sendTextMessage(Map<String, String> headers, - String body, - String user, - String password) throws Exception { - boolean durable = false; - if (headers.containsKey("JMSDeliveryMode")) { - String jmsDeliveryMode = headers.remove("JMSDeliveryMode"); - if (jmsDeliveryMode != null && (jmsDeliveryMode.equals("2") || jmsDeliveryMode.equalsIgnoreCase("PERSISTENT"))) { - durable = true; - } - } - String userID = UUIDGenerator.getInstance().generateStringUUID(); - ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(56); - buffer.writeNullableSimpleString(new SimpleString(body)); - byte[] bytes = new byte[buffer.readableBytes()]; - buffer.readBytes(bytes); - coreQueueControl.sendMessage(headers, Message.TEXT_TYPE, Base64.encodeBytes(bytes), userID, durable, user, password); - return userID; - } - - @Override - public boolean changeMessagePriority(final String messageID, final int newPriority) throws Exception { - String filter = JMSQueueControlImpl.createFilterForJMSMessageID(messageID); - int changed = coreQueueControl.changeMessagesPriority(filter, newPriority); - if (changed != 1) { - throw new IllegalArgumentException("No message found for JMSMessageID: " + messageID); - } - return true; - } - - @Override - public int changeMessagesPriority(final String filterStr, final int newPriority) throws Exception { - String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr); - return coreQueueControl.changeMessagesPriority(filter, newPriority); - } - - @Override - public boolean retryMessage(final String jmsMessageID) throws Exception { - - // Figure out messageID from JMSMessageID. - final String filter = createFilterForJMSMessageID(jmsMessageID); - Map<String, Object>[] messages = coreQueueControl.listMessages(filter); - if (messages.length != 1) { // if no messages. There should not be more than one, JMSMessageID should be unique. - return false; - } - - final Map<String, Object> messageToRedeliver = messages[0]; - Long messageID = (Long) messageToRedeliver.get("messageID"); - return messageID != null && coreQueueControl.retryMessage(messageID); - } - - @Override - public int retryMessages() throws Exception { - return coreQueueControl.retryMessages(); - } - - @Override - public boolean moveMessage(final String messageID, final String otherQueueName) throws Exception { - return moveMessage(messageID, otherQueueName, false); - } - - @Override - public boolean moveMessage(final String messageID, - final String otherQueueName, - final boolean rejectDuplicates) throws Exception { - String filter = JMSQueueControlImpl.createFilterForJMSMessageID(messageID); - ActiveMQDestination otherQueue = ActiveMQDestination.createQueue(otherQueueName); - int moved = coreQueueControl.moveMessages(filter, otherQueue.getAddress(), rejectDuplicates); - if (moved != 1) { - throw new IllegalArgumentException("No message found for JMSMessageID: " + messageID); - } - - return true; - } - - @Override - public int moveMessages(final String filterStr, - final String otherQueueName, - final boolean rejectDuplicates) throws Exception { - String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr); - ActiveMQDestination otherQueue = ActiveMQDestination.createQueue(otherQueueName); - return coreQueueControl.moveMessages(filter, otherQueue.getAddress(), rejectDuplicates); - } - - @Override - public int moveMessages(final String filterStr, final String otherQueueName) throws Exception { - return moveMessages(filterStr, otherQueueName, false); - } - - @Override - @Operation(desc = "List all the existent consumers on the Queue") - public String listConsumersAsJSON() throws Exception { - return coreQueueControl.listConsumersAsJSON(); - } - - @Override - public String listMessageCounter() { - try { - return MessageCounterInfo.toJSon(counter); - } catch (Exception e) { - throw new IllegalStateException(e); - } - } - - @Override - public void resetMessageCounter() throws Exception { - coreQueueControl.resetMessageCounter(); - } - - @Override - public String listMessageCounterAsHTML() { - return MessageCounterHelper.listMessageCounterAsHTML(new MessageCounter[]{counter}); - } - - @Override - public String listMessageCounterHistory() throws Exception { - return MessageCounterHelper.listMessageCounterHistory(counter); - } - - @Override - public String listMessageCounterHistoryAsHTML() { - return MessageCounterHelper.listMessageCounterHistoryAsHTML(new MessageCounter[]{counter}); - } - - @Override - public boolean isPaused() throws Exception { - return coreQueueControl.isPaused(); - } - - @Override - public void pause() throws Exception { - coreQueueControl.pause(); - } - - @Override - public void pause(boolean persist) throws Exception { - coreQueueControl.pause(persist); - } - - @Override - public void resume() throws Exception { - coreQueueControl.resume(); - } - - @Override - public CompositeData[] browse() throws Exception { - return browse(null); - } - - @Override - public CompositeData[] browse(String filter) throws Exception { - try { - CompositeData[] messages = coreQueueControl.browse(filter); - - ArrayList<CompositeData> c = new ArrayList<>(); - - for (CompositeData message : messages) { - c.add(toJMSCompositeType((CompositeDataSupport) message)); - } - CompositeData[] rc = new CompositeData[c.size()]; - c.toArray(rc); - return rc; - } catch (ActiveMQInvalidFilterExpressionException e) { - throw new InvalidSelectorException(e.getMessage()); - } - } - - @Override - public String getSelector() { - return coreQueueControl.getFilter(); - } - - @Override - public void flushExecutor() { - coreQueueControl.flushExecutor(); - } - - @Override - public MBeanInfo getMBeanInfo() { - MBeanInfo info = super.getMBeanInfo(); - return new MBeanInfo(info.getClassName(), info.getDescription(), MBeanInfoHelper.getMBeanAttributesInfo(JMSQueueControl.class), info.getConstructors(), MBeanInfoHelper.getMBeanOperationsInfo(JMSQueueControl.class), info.getNotifications()); - } - - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - - // Private ------------------------------------------------------- - - // Inner classes ------------------------------------------------- -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae40a3d3/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSServerControlImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSServerControlImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSServerControlImpl.java deleted file mode 100644 index e9e2f3c..0000000 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSServerControlImpl.java +++ /dev/null @@ -1,876 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.jms.management.impl; - -import javax.json.JsonArray; -import javax.json.JsonArrayBuilder; -import javax.json.JsonObject; -import javax.json.JsonObjectBuilder; -import javax.management.ListenerNotFoundException; -import javax.management.MBeanAttributeInfo; -import javax.management.MBeanNotificationInfo; -import javax.management.MBeanOperationInfo; -import javax.management.Notification; -import javax.management.NotificationBroadcasterSupport; -import javax.management.NotificationEmitter; -import javax.management.NotificationFilter; -import javax.management.NotificationListener; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.api.core.TransportConfiguration; -import org.apache.activemq.artemis.api.core.client.ClientSession; -import org.apache.activemq.artemis.api.core.management.Parameter; -import org.apache.activemq.artemis.api.jms.JMSFactoryType; -import org.apache.activemq.artemis.api.jms.management.ConnectionFactoryControl; -import org.apache.activemq.artemis.api.jms.management.DestinationControl; -import org.apache.activemq.artemis.api.jms.management.JMSQueueControl; -import org.apache.activemq.artemis.api.jms.management.JMSServerControl; -import org.apache.activemq.artemis.api.jms.management.TopicControl; -import org.apache.activemq.artemis.core.client.impl.Topology; -import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl; -import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.management.impl.AbstractControl; -import org.apache.activemq.artemis.core.management.impl.MBeanInfoHelper; -import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.core.server.ServerConsumer; -import org.apache.activemq.artemis.core.server.ServerSession; -import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; -import org.apache.activemq.artemis.core.server.cluster.ClusterManager; -import org.apache.activemq.artemis.core.server.impl.AddressInfo; -import org.apache.activemq.artemis.jms.server.ActiveMQJMSServerLogger; -import org.apache.activemq.artemis.jms.server.JMSServerManager; -import org.apache.activemq.artemis.jms.server.config.ConnectionFactoryConfiguration; -import org.apache.activemq.artemis.jms.server.config.impl.ConnectionFactoryConfigurationImpl; -import org.apache.activemq.artemis.jms.server.management.JMSNotificationType; -import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; -import org.apache.activemq.artemis.utils.JsonLoader; -import org.apache.activemq.artemis.utils.TypedProperties; - -public class JMSServerControlImpl extends AbstractControl implements JMSServerControl, NotificationEmitter, org.apache.activemq.artemis.core.server.management.NotificationListener { - - // Constants ----------------------------------------------------- - - // Attributes ---------------------------------------------------- - - private final JMSServerManager server; - - private final NotificationBroadcasterSupport broadcaster; - - private final AtomicLong notifSeq = new AtomicLong(0); - - // Static -------------------------------------------------------- - - private static String[] convert(final Object[] bindings) { - String[] theBindings = new String[bindings.length]; - for (int i = 0, bindingsLength = bindings.length; i < bindingsLength; i++) { - theBindings[i] = bindings[i].toString().trim(); - } - return theBindings; - } - - private static String[] toArray(final String commaSeparatedString) { - if (commaSeparatedString == null || commaSeparatedString.trim().length() == 0) { - return new String[0]; - } - String[] values = commaSeparatedString.split(","); - String[] trimmed = new String[values.length]; - for (int i = 0; i < values.length; i++) { - trimmed[i] = values[i].trim(); - trimmed[i] = trimmed[i].replace(",", ","); - } - return trimmed; - } - - public static MBeanNotificationInfo[] getNotificationInfos() { - JMSNotificationType[] values = JMSNotificationType.values(); - String[] names = new String[values.length]; - for (int i = 0; i < values.length; i++) { - names[i] = values[i].toString(); - } - return new MBeanNotificationInfo[]{new MBeanNotificationInfo(names, JMSServerControl.class.getName(), "Notifications emitted by a JMS Server")}; - } - - // Constructors -------------------------------------------------- - - public JMSServerControlImpl(final JMSServerManager server) throws Exception { - super(JMSServerControl.class, server.getActiveMQServer().getStorageManager()); - this.server = server; - broadcaster = new NotificationBroadcasterSupport(); - server.getActiveMQServer().getManagementService().addNotificationListener(this); - } - - // Public -------------------------------------------------------- - - // JMSServerControlMBean implementation -------------------------- - - /** - * See the interface definition for the javadoc. - */ - @Override - public void createConnectionFactory(String name, - boolean ha, - boolean useDiscovery, - int cfType, - String[] connectorNames, - Object[] bindings) throws Exception { - checkStarted(); - - clearIO(); - - try { - if (useDiscovery) { - if (connectorNames == null || connectorNames.length == 0) { - throw new IllegalArgumentException("no discovery group name supplied"); - } - server.createConnectionFactory(name, ha, JMSFactoryType.valueOf(cfType), connectorNames[0], JMSServerControlImpl.convert(bindings)); - } else { - List<String> connectorList = new ArrayList<>(connectorNames.length); - - for (String str : connectorNames) { - connectorList.add(str); - } - - server.createConnectionFactory(name, ha, JMSFactoryType.valueOf(cfType), connectorList, JMSServerControlImpl.convert(bindings)); - } - } finally { - blockOnIO(); - } - } - - @Override - public void createConnectionFactory(String name, - boolean ha, - boolean useDiscovery, - int cfType, - String connectors, - String bindings, - String clientID, - long clientFailureCheckPeriod, - long connectionTTL, - long callTimeout, - long callFailoverTimeout, - int minLargeMessageSize, - boolean compressLargeMessages, - int consumerWindowSize, - int consumerMaxRate, - int confirmationWindowSize, - int producerWindowSize, - int producerMaxRate, - boolean blockOnAcknowledge, - boolean blockOnDurableSend, - boolean blockOnNonDurableSend, - boolean autoGroup, - boolean preAcknowledge, - String loadBalancingPolicyClassName, - int transactionBatchSize, - int dupsOKBatchSize, - boolean useGlobalPools, - int scheduledThreadPoolMaxSize, - int threadPoolMaxSize, - long retryInterval, - double retryIntervalMultiplier, - long maxRetryInterval, - int reconnectAttempts, - boolean failoverOnInitialConnection, - String groupId) throws Exception { - createConnectionFactory(name, ha, useDiscovery, cfType, toArray(connectors), toArray(bindings), clientID, clientFailureCheckPeriod, connectionTTL, callTimeout, callFailoverTimeout, minLargeMessageSize, compressLargeMessages, consumerWindowSize, consumerMaxRate, confirmationWindowSize, producerWindowSize, producerMaxRate, blockOnAcknowledge, blockOnDurableSend, blockOnNonDurableSend, autoGroup, preAcknowledge, loadBalancingPolicyClassName, transactionBatchSize, dupsOKBatchSize, useGlobalPools, scheduledThreadPoolMaxSize, threadPoolMaxSize, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, failoverOnInitialConnection, groupId); - } - - @Override - public void createConnectionFactory(String name, - boolean ha, - boolean useDiscovery, - int cfType, - String[] connectorNames, - String[] bindings, - String clientID, - long clientFailureCheckPeriod, - long connectionTTL, - long callTimeout, - long callFailoverTimeout, - int minLargeMessageSize, - boolean compressLargeMessages, - int consumerWindowSize, - int consumerMaxRate, - int confirmationWindowSize, - int producerWindowSize, - int producerMaxRate, - boolean blockOnAcknowledge, - boolean blockOnDurableSend, - boolean blockOnNonDurableSend, - boolean autoGroup, - boolean preAcknowledge, - String loadBalancingPolicyClassName, - int transactionBatchSize, - int dupsOKBatchSize, - boolean useGlobalPools, - int scheduledThreadPoolMaxSize, - int threadPoolMaxSize, - long retryInterval, - double retryIntervalMultiplier, - long maxRetryInterval, - int reconnectAttempts, - boolean failoverOnInitialConnection, - String groupId) throws Exception { - checkStarted(); - - clearIO(); - - try { - ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl().setName(name).setHA(ha).setBindings(bindings).setFactoryType(JMSFactoryType.valueOf(cfType)).setClientID(clientID).setClientFailureCheckPeriod(clientFailureCheckPeriod).setConnectionTTL(connectionTTL).setCallTimeout(callTimeout).setCallFailoverTimeout(callFailoverTimeout).setMinLargeMessageSize(minLargeMessageSize).setCompressLargeMessages(compressLargeMessages).setConsumerWindowSize(consumerWindowSize).setConsumerMaxRate(consumerMaxRate).setConfirmationWindowSize(confirmationWindowSize).setProducerWindowSize(producerWindowSize).setProducerMaxRate(producerMaxRate).setBlockOnAcknowledge(blockOnAcknowledge).setBlockOnDurableSend(blockOnDurableSend).setBlockOnNonDurableSend(blockOnNonDurableSend).setAutoGroup(autoGroup).setPreAcknowledge(preAcknowledge).setTransactionBatchSize(transactionBatchSize).setDupsOKBatchSize(dupsOKBatchSize).setUseGlobalPools(useGlobalPools).setScheduledThreadPoolM axSize(scheduledThreadPoolMaxSize).setThreadPoolMaxSize(threadPoolMaxSize).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setMaxRetryInterval(maxRetryInterval).setReconnectAttempts(reconnectAttempts).setFailoverOnInitialConnection(failoverOnInitialConnection).setGroupID(groupId); - - if (useDiscovery) { - configuration.setDiscoveryGroupName(connectorNames[0]); - } else { - ArrayList<String> connectorNamesList = new ArrayList<>(); - for (String nameC : connectorNames) { - connectorNamesList.add(nameC); - } - configuration.setConnectorNames(connectorNamesList); - } - - if (loadBalancingPolicyClassName != null && !loadBalancingPolicyClassName.trim().equals("")) { - configuration.setLoadBalancingPolicyClassName(loadBalancingPolicyClassName); - } - - server.createConnectionFactory(true, configuration, bindings); - } finally { - blockOnIO(); - } - } - - /** - * Create a JMS ConnectionFactory with the specified name connected to a single live-backup pair of servers. - * <br> - * The ConnectionFactory is bound to the Registry for all the specified bindings Strings. - */ - @Override - public void createConnectionFactory(String name, - boolean ha, - boolean useDiscovery, - int cfType, - String connectors, - String bindings) throws Exception { - createConnectionFactory(name, ha, useDiscovery, cfType, toArray(connectors), toArray(bindings)); - } - - @Override - public boolean createQueue(final String name) throws Exception { - return createQueue(name, null, null, true); - } - - @Override - public boolean createQueue(final String name, final String bindings) throws Exception { - return createQueue(name, bindings, null, true); - } - - @Override - public boolean createQueue(String name, String bindings, String selector) throws Exception { - return createQueue(name, bindings, selector, true); - } - - @Override - public boolean createQueue(@Parameter(name = "name", desc = "Name of the queue to create") String name, - @Parameter(name = "bindings", desc = "comma-separated list of Registry bindings (use ',' if u need to use commas in your bindings name)") String bindings, - @Parameter(name = "selector", desc = "the jms selector") String selector, - @Parameter(name = "durable", desc = "is the queue persistent and resilient to restart") boolean durable) throws Exception { - checkStarted(); - - clearIO(); - - try { - return server.createQueue(true, name, selector, durable, JMSServerControlImpl.toArray(bindings)); - } finally { - blockOnIO(); - } - } - - @Override - public boolean destroyQueue(final String name) throws Exception { - return destroyQueue(name, false); - } - - @Override - public boolean destroyQueue(final String name, final boolean removeConsumers) throws Exception { - checkStarted(); - - clearIO(); - - try { - return server.destroyQueue(name, removeConsumers); - } finally { - blockOnIO(); - } - } - - @Override - public boolean createTopic(String name) throws Exception { - return createTopic(name, null); - } - - @Override - public boolean createTopic(final String topicName, final String bindings) throws Exception { - checkStarted(); - - clearIO(); - - try { - return server.createTopic(true, topicName, JMSServerControlImpl.toArray(bindings)); - } finally { - blockOnIO(); - } - } - - @Override - public boolean destroyTopic(final String name) throws Exception { - return destroyTopic(name, true); - } - - @Override - public boolean destroyTopic(final String name, final boolean removeConsumers) throws Exception { - checkStarted(); - - clearIO(); - - try { - return server.destroyTopic(name, removeConsumers); - } finally { - blockOnIO(); - } - } - - @Override - public void destroyConnectionFactory(final String name) throws Exception { - checkStarted(); - - clearIO(); - - try { - server.destroyConnectionFactory(name); - } finally { - blockOnIO(); - } - } - - @Override - public boolean isStarted() { - return server.isStarted(); - } - - @Override - public String getVersion() { - checkStarted(); - - return server.getVersion(); - } - - @Override - public String[] getQueueNames() { - checkStarted(); - - clearIO(); - - try { - Object[] queueControls = server.getActiveMQServer().getManagementService().getResources(JMSQueueControl.class); - String[] names = new String[queueControls.length]; - for (int i = 0; i < queueControls.length; i++) { - JMSQueueControl queueControl = (JMSQueueControl) queueControls[i]; - names[i] = queueControl.getName(); - } - return names; - } finally { - blockOnIO(); - } - } - - @Override - public String[] getTopicNames() { - checkStarted(); - - clearIO(); - - try { - Object[] topicControls = server.getActiveMQServer().getManagementService().getResources(TopicControl.class); - String[] names = new String[topicControls.length]; - for (int i = 0; i < topicControls.length; i++) { - TopicControl topicControl = (TopicControl) topicControls[i]; - names[i] = topicControl.getName(); - } - return names; - } finally { - blockOnIO(); - } - } - - @Override - public String[] getConnectionFactoryNames() { - checkStarted(); - - clearIO(); - - try { - Object[] cfControls = server.getActiveMQServer().getManagementService().getResources(ConnectionFactoryControl.class); - String[] names = new String[cfControls.length]; - for (int i = 0; i < cfControls.length; i++) { - ConnectionFactoryControl cfControl = (ConnectionFactoryControl) cfControls[i]; - names[i] = cfControl.getName(); - } - return names; - } finally { - blockOnIO(); - } - } - - @Override - public String getNodeID() { - return server.getActiveMQServer().getNodeID().toString(); - } - - // NotificationEmitter implementation ---------------------------- - - @Override - public void removeNotificationListener(final NotificationListener listener, - final NotificationFilter filter, - final Object handback) throws ListenerNotFoundException { - broadcaster.removeNotificationListener(listener, filter, handback); - } - - @Override - public void removeNotificationListener(final NotificationListener listener) throws ListenerNotFoundException { - broadcaster.removeNotificationListener(listener); - } - - @Override - public void addNotificationListener(final NotificationListener listener, - final NotificationFilter filter, - final Object handback) throws IllegalArgumentException { - broadcaster.addNotificationListener(listener, filter, handback); - } - - @Override - public MBeanNotificationInfo[] getNotificationInfo() { - return JMSServerControlImpl.getNotificationInfos(); - } - - @Override - public String[] listRemoteAddresses() throws Exception { - checkStarted(); - - clearIO(); - - try { - return server.listRemoteAddresses(); - } finally { - blockOnIO(); - } - } - - @Override - public String[] listRemoteAddresses(final String ipAddress) throws Exception { - checkStarted(); - - clearIO(); - - try { - return server.listRemoteAddresses(ipAddress); - } finally { - blockOnIO(); - } - } - - @Override - public boolean closeConnectionsForAddress(final String ipAddress) throws Exception { - checkStarted(); - - clearIO(); - - try { - return server.closeConnectionsForAddress(ipAddress); - } finally { - blockOnIO(); - } - } - - @Override - public boolean closeConsumerConnectionsForAddress(final String address) throws Exception { - checkStarted(); - - clearIO(); - - try { - return server.closeConsumerConnectionsForAddress(address); - } finally { - blockOnIO(); - } - } - - @Override - public boolean closeConnectionsForUser(final String userName) throws Exception { - checkStarted(); - - clearIO(); - - try { - return server.closeConnectionsForUser(userName); - } finally { - blockOnIO(); - } - } - - @Override - public String[] listConnectionIDs() throws Exception { - checkStarted(); - - clearIO(); - - try { - return server.listConnectionIDs(); - } finally { - blockOnIO(); - } - } - - @Override - public String listConnectionsAsJSON() throws Exception { - checkStarted(); - - clearIO(); - - try { - JsonArrayBuilder array = JsonLoader.createArrayBuilder(); - - Set<RemotingConnection> connections = server.getActiveMQServer().getRemotingService().getConnections(); - - Set<ServerSession> sessions = server.getActiveMQServer().getSessions(); - - Map<Object, ServerSession> jmsSessions = new HashMap<>(); - - // First separate the real jms sessions, after all we are only interested in those here on the *jms* server controller - for (ServerSession session : sessions) { - if (session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null) { - jmsSessions.put(session.getConnectionID(), session); - } - } - - for (RemotingConnection connection : connections) { - ServerSession session = jmsSessions.get(connection.getID()); - if (session != null) { - JsonObjectBuilder objectBuilder = JsonLoader.createObjectBuilder().add("connectionID", connection.getID().toString()).add("clientAddress", connection.getRemoteAddress()).add("creationTime", connection.getCreationTime()); - - if (session.getMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY) != null) { - objectBuilder.add("clientID", session.getMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY)); - } - - if (session.getUsername() != null) { - objectBuilder.add("principal", session.getUsername()); - } - - array.add(objectBuilder.build()); - } - } - return array.build().toString(); - } finally { - blockOnIO(); - } - } - - @Override - public String listConsumersAsJSON(String connectionID) throws Exception { - checkStarted(); - - clearIO(); - - try { - JsonArrayBuilder array = JsonLoader.createArrayBuilder(); - - Set<RemotingConnection> connections = server.getActiveMQServer().getRemotingService().getConnections(); - for (RemotingConnection connection : connections) { - if (connectionID.equals(connection.getID().toString())) { - List<ServerSession> sessions = server.getActiveMQServer().getSessions(connectionID); - for (ServerSession session : sessions) { - Set<ServerConsumer> consumers = session.getServerConsumers(); - for (ServerConsumer consumer : consumers) { - JsonObject obj = toJSONObject(consumer); - if (obj != null) { - array.add(obj); - } - } - } - } - } - return array.build().toString(); - } finally { - blockOnIO(); - } - } - - @Override - public String listAllConsumersAsJSON() throws Exception { - checkStarted(); - - clearIO(); - - try { - JsonArray jsonArray = toJsonArray(server.getActiveMQServer().getSessions()); - return jsonArray.toString(); - } finally { - blockOnIO(); - } - } - - @Override - public String[] listSessions(final String connectionID) throws Exception { - checkStarted(); - - clearIO(); - - try { - return server.listSessions(connectionID); - } finally { - blockOnIO(); - } - } - - @Override - public String listPreparedTransactionDetailsAsJSON() throws Exception { - checkStarted(); - - clearIO(); - - try { - return server.listPreparedTransactionDetailsAsJSON(); - } finally { - blockOnIO(); - } - } - - @Override - public String listPreparedTransactionDetailsAsHTML() throws Exception { - checkStarted(); - - clearIO(); - - try { - return server.listPreparedTransactionDetailsAsHTML(); - } finally { - blockOnIO(); - } - } - - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.management.impl.AbstractControl#fillMBeanOperationInfo() - */ - @Override - protected MBeanOperationInfo[] fillMBeanOperationInfo() { - return MBeanInfoHelper.getMBeanOperationsInfo(JMSServerControl.class); - } - - @Override - protected MBeanAttributeInfo[] fillMBeanAttributeInfo() { - return MBeanInfoHelper.getMBeanAttributesInfo(JMSServerControl.class); - } - - // Private ------------------------------------------------------- - - private void checkStarted() { - if (!server.isStarted()) { - throw new IllegalStateException("ActiveMQ Artemis JMS Server is not started. It can not be managed yet"); - } - } - - // Inner classes ------------------------------------------------- - - @Override - public String[] listTargetDestinations(String sessionID) throws Exception { - String[] addresses = server.getActiveMQServer().getActiveMQServerControl().listTargetAddresses(sessionID); - Map<String, DestinationControl> allDests = new HashMap<>(); - - Object[] queueControls = server.getActiveMQServer().getManagementService().getResources(JMSQueueControl.class); - for (Object queueControl2 : queueControls) { - JMSQueueControl queueControl = (JMSQueueControl) queueControl2; - allDests.put(queueControl.getAddress(), queueControl); - } - - Object[] topicControls = server.getActiveMQServer().getManagementService().getResources(TopicControl.class); - for (Object topicControl2 : topicControls) { - TopicControl topicControl = (TopicControl) topicControl2; - allDests.put(topicControl.getAddress(), topicControl); - } - - List<String> destinations = new ArrayList<>(); - for (String addresse : addresses) { - DestinationControl control = allDests.get(addresse); - if (control != null) { - destinations.add(control.getAddress()); - } - } - return destinations.toArray(new String[destinations.size()]); - } - - @Override - public String getLastSentMessageID(String sessionID, String address) throws Exception { - ServerSession session = server.getActiveMQServer().getSessionByID(sessionID); - if (session != null) { - return session.getLastSentMessageID(address); - } - return null; - } - - @Override - public String getSessionCreationTime(String sessionID) throws Exception { - ServerSession session = server.getActiveMQServer().getSessionByID(sessionID); - if (session != null) { - return String.valueOf(session.getCreationTime()); - } - return null; - } - - @Override - public String listSessionsAsJSON(final String connectionID) throws Exception { - checkStarted(); - - clearIO(); - - try { - return server.listSessionsAsJSON(connectionID); - } finally { - blockOnIO(); - } - } - - @Override - public String listNetworkTopology() throws Exception { - checkStarted(); - - clearIO(); - try { - JsonArrayBuilder brokers = JsonLoader.createArrayBuilder(); - ClusterManager clusterManager = server.getActiveMQServer().getClusterManager(); - if (clusterManager != null) { - Set<ClusterConnection> clusterConnections = clusterManager.getClusterConnections(); - for (ClusterConnection clusterConnection : clusterConnections) { - Topology topology = clusterConnection.getTopology(); - Collection<TopologyMemberImpl> members = topology.getMembers(); - for (TopologyMemberImpl member : members) { - - JsonObjectBuilder obj = JsonLoader.createObjectBuilder(); - TransportConfiguration live = member.getLive(); - if (live != null) { - obj.add("nodeID", member.getNodeId()).add("live", live.getParams().get("host") + ":" + live.getParams().get("port")); - TransportConfiguration backup = member.getBackup(); - if (backup != null) { - obj.add("backup", backup.getParams().get("host") + ":" + backup.getParams().get("port")); - } - } - brokers.add(obj); - } - } - } - return brokers.build().toString(); - } finally { - blockOnIO(); - } - } - - @Override - public String closeConnectionWithClientID(final String clientID) throws Exception { - return server.getActiveMQServer().destroyConnectionWithSessionMetadata(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY, clientID); - } - - private String determineJMSDestinationType(Queue queue) { - String result; - if (server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(queue.getAddress().toString())).getRoutingType() == AddressInfo.RoutingType.ANYCAST) { - if (queue.isTemporary()) { - result = "tempqueue"; - } else { - result = "queue"; - } - } else if (server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(queue.getAddress().toString())).getRoutingType() == AddressInfo.RoutingType.MULTICAST) { - if (queue.isTemporary()) { - result = "temptopic"; - } else { - result = "topic"; - } - } else { - ActiveMQJMSServerLogger.LOGGER.debug("JMSServerControlImpl.determineJMSDestinationType() " + queue); - // not related to JMS - return null; - } - return result; - } - - private JsonObject toJSONObject(ServerConsumer consumer) { - AddressInfo addressInfo = server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(consumer.getQueue().getAddress().toString())); - if (addressInfo == null) { - return null; - } - JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("consumerID", consumer.getID()).add("connectionID", consumer.getConnectionID().toString()).add("sessionID", consumer.getSessionID()).add("queueName", consumer.getQueue().getName().toString()).add("browseOnly", consumer.isBrowseOnly()).add("creationTime", consumer.getCreationTime()).add("destinationName", consumer.getQueue().getAddress().toString()).add("destinationType", determineJMSDestinationType(consumer.getQueue())); - // JMS consumer with message filter use the queue's filter - Filter queueFilter = consumer.getQueue().getFilter(); - if (queueFilter != null) { - obj.add("filter", queueFilter.getFilterString().toString()); - } - - if (addressInfo.getRoutingType().equals(AddressInfo.RoutingType.MULTICAST)) { - if (consumer.getQueue().isTemporary()) { - obj.add("durable", false); - } else { - obj.add("durable", true); - } - } else { - obj.add("durable", false); - } - - return obj.build(); - } - - @Override - public void onNotification(org.apache.activemq.artemis.core.server.management.Notification notification) { - if (!(notification.getType() instanceof JMSNotificationType)) - return; - JMSNotificationType type = (JMSNotificationType) notification.getType(); - TypedProperties prop = notification.getProperties(); - - this.broadcaster.sendNotification(new Notification(type.toString(), this, notifSeq.incrementAndGet(), prop.getSimpleStringProperty(JMSNotificationType.MESSAGE).toString())); - } - - private JsonArray toJsonArray(Collection<ServerSession> sessions) { - JsonArrayBuilder array = JsonLoader.createArrayBuilder(); - - for (ServerSession session : sessions) { - Set<ServerConsumer> consumers = session.getServerConsumers(); - for (ServerConsumer consumer : consumers) { - JsonObject obj = toJSONObject(consumer); - if (obj != null) { - array.add(obj); - } - } - } - return array.build(); - } - -}
