http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0189f156/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSServerControlImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSServerControlImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSServerControlImpl.java deleted file mode 100644 index e9e2f3c..0000000 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSServerControlImpl.java +++ /dev/null @@ -1,876 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.jms.management.impl; - -import javax.json.JsonArray; -import javax.json.JsonArrayBuilder; -import javax.json.JsonObject; -import javax.json.JsonObjectBuilder; -import javax.management.ListenerNotFoundException; -import javax.management.MBeanAttributeInfo; -import javax.management.MBeanNotificationInfo; -import javax.management.MBeanOperationInfo; -import javax.management.Notification; -import javax.management.NotificationBroadcasterSupport; -import javax.management.NotificationEmitter; -import javax.management.NotificationFilter; -import javax.management.NotificationListener; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.api.core.TransportConfiguration; -import org.apache.activemq.artemis.api.core.client.ClientSession; -import org.apache.activemq.artemis.api.core.management.Parameter; -import org.apache.activemq.artemis.api.jms.JMSFactoryType; -import org.apache.activemq.artemis.api.jms.management.ConnectionFactoryControl; -import org.apache.activemq.artemis.api.jms.management.DestinationControl; -import org.apache.activemq.artemis.api.jms.management.JMSQueueControl; -import org.apache.activemq.artemis.api.jms.management.JMSServerControl; -import org.apache.activemq.artemis.api.jms.management.TopicControl; -import org.apache.activemq.artemis.core.client.impl.Topology; -import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl; -import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.management.impl.AbstractControl; -import org.apache.activemq.artemis.core.management.impl.MBeanInfoHelper; -import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.core.server.ServerConsumer; -import org.apache.activemq.artemis.core.server.ServerSession; -import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; -import org.apache.activemq.artemis.core.server.cluster.ClusterManager; -import org.apache.activemq.artemis.core.server.impl.AddressInfo; -import org.apache.activemq.artemis.jms.server.ActiveMQJMSServerLogger; -import org.apache.activemq.artemis.jms.server.JMSServerManager; -import org.apache.activemq.artemis.jms.server.config.ConnectionFactoryConfiguration; -import org.apache.activemq.artemis.jms.server.config.impl.ConnectionFactoryConfigurationImpl; -import org.apache.activemq.artemis.jms.server.management.JMSNotificationType; -import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; -import org.apache.activemq.artemis.utils.JsonLoader; -import org.apache.activemq.artemis.utils.TypedProperties; - -public class JMSServerControlImpl extends AbstractControl implements JMSServerControl, NotificationEmitter, org.apache.activemq.artemis.core.server.management.NotificationListener { - - // Constants ----------------------------------------------------- - - // Attributes ---------------------------------------------------- - - private final JMSServerManager server; - - private final NotificationBroadcasterSupport broadcaster; - - private final AtomicLong notifSeq = new AtomicLong(0); - - // Static -------------------------------------------------------- - - private static String[] convert(final Object[] bindings) { - String[] theBindings = new String[bindings.length]; - for (int i = 0, bindingsLength = bindings.length; i < bindingsLength; i++) { - theBindings[i] = bindings[i].toString().trim(); - } - return theBindings; - } - - private static String[] toArray(final String commaSeparatedString) { - if (commaSeparatedString == null || commaSeparatedString.trim().length() == 0) { - return new String[0]; - } - String[] values = commaSeparatedString.split(","); - String[] trimmed = new String[values.length]; - for (int i = 0; i < values.length; i++) { - trimmed[i] = values[i].trim(); - trimmed[i] = trimmed[i].replace(",", ","); - } - return trimmed; - } - - public static MBeanNotificationInfo[] getNotificationInfos() { - JMSNotificationType[] values = JMSNotificationType.values(); - String[] names = new String[values.length]; - for (int i = 0; i < values.length; i++) { - names[i] = values[i].toString(); - } - return new MBeanNotificationInfo[]{new MBeanNotificationInfo(names, JMSServerControl.class.getName(), "Notifications emitted by a JMS Server")}; - } - - // Constructors -------------------------------------------------- - - public JMSServerControlImpl(final JMSServerManager server) throws Exception { - super(JMSServerControl.class, server.getActiveMQServer().getStorageManager()); - this.server = server; - broadcaster = new NotificationBroadcasterSupport(); - server.getActiveMQServer().getManagementService().addNotificationListener(this); - } - - // Public -------------------------------------------------------- - - // JMSServerControlMBean implementation -------------------------- - - /** - * See the interface definition for the javadoc. - */ - @Override - public void createConnectionFactory(String name, - boolean ha, - boolean useDiscovery, - int cfType, - String[] connectorNames, - Object[] bindings) throws Exception { - checkStarted(); - - clearIO(); - - try { - if (useDiscovery) { - if (connectorNames == null || connectorNames.length == 0) { - throw new IllegalArgumentException("no discovery group name supplied"); - } - server.createConnectionFactory(name, ha, JMSFactoryType.valueOf(cfType), connectorNames[0], JMSServerControlImpl.convert(bindings)); - } else { - List<String> connectorList = new ArrayList<>(connectorNames.length); - - for (String str : connectorNames) { - connectorList.add(str); - } - - server.createConnectionFactory(name, ha, JMSFactoryType.valueOf(cfType), connectorList, JMSServerControlImpl.convert(bindings)); - } - } finally { - blockOnIO(); - } - } - - @Override - public void createConnectionFactory(String name, - boolean ha, - boolean useDiscovery, - int cfType, - String connectors, - String bindings, - String clientID, - long clientFailureCheckPeriod, - long connectionTTL, - long callTimeout, - long callFailoverTimeout, - int minLargeMessageSize, - boolean compressLargeMessages, - int consumerWindowSize, - int consumerMaxRate, - int confirmationWindowSize, - int producerWindowSize, - int producerMaxRate, - boolean blockOnAcknowledge, - boolean blockOnDurableSend, - boolean blockOnNonDurableSend, - boolean autoGroup, - boolean preAcknowledge, - String loadBalancingPolicyClassName, - int transactionBatchSize, - int dupsOKBatchSize, - boolean useGlobalPools, - int scheduledThreadPoolMaxSize, - int threadPoolMaxSize, - long retryInterval, - double retryIntervalMultiplier, - long maxRetryInterval, - int reconnectAttempts, - boolean failoverOnInitialConnection, - String groupId) throws Exception { - createConnectionFactory(name, ha, useDiscovery, cfType, toArray(connectors), toArray(bindings), clientID, clientFailureCheckPeriod, connectionTTL, callTimeout, callFailoverTimeout, minLargeMessageSize, compressLargeMessages, consumerWindowSize, consumerMaxRate, confirmationWindowSize, producerWindowSize, producerMaxRate, blockOnAcknowledge, blockOnDurableSend, blockOnNonDurableSend, autoGroup, preAcknowledge, loadBalancingPolicyClassName, transactionBatchSize, dupsOKBatchSize, useGlobalPools, scheduledThreadPoolMaxSize, threadPoolMaxSize, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, failoverOnInitialConnection, groupId); - } - - @Override - public void createConnectionFactory(String name, - boolean ha, - boolean useDiscovery, - int cfType, - String[] connectorNames, - String[] bindings, - String clientID, - long clientFailureCheckPeriod, - long connectionTTL, - long callTimeout, - long callFailoverTimeout, - int minLargeMessageSize, - boolean compressLargeMessages, - int consumerWindowSize, - int consumerMaxRate, - int confirmationWindowSize, - int producerWindowSize, - int producerMaxRate, - boolean blockOnAcknowledge, - boolean blockOnDurableSend, - boolean blockOnNonDurableSend, - boolean autoGroup, - boolean preAcknowledge, - String loadBalancingPolicyClassName, - int transactionBatchSize, - int dupsOKBatchSize, - boolean useGlobalPools, - int scheduledThreadPoolMaxSize, - int threadPoolMaxSize, - long retryInterval, - double retryIntervalMultiplier, - long maxRetryInterval, - int reconnectAttempts, - boolean failoverOnInitialConnection, - String groupId) throws Exception { - checkStarted(); - - clearIO(); - - try { - ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl().setName(name).setHA(ha).setBindings(bindings).setFactoryType(JMSFactoryType.valueOf(cfType)).setClientID(clientID).setClientFailureCheckPeriod(clientFailureCheckPeriod).setConnectionTTL(connectionTTL).setCallTimeout(callTimeout).setCallFailoverTimeout(callFailoverTimeout).setMinLargeMessageSize(minLargeMessageSize).setCompressLargeMessages(compressLargeMessages).setConsumerWindowSize(consumerWindowSize).setConsumerMaxRate(consumerMaxRate).setConfirmationWindowSize(confirmationWindowSize).setProducerWindowSize(producerWindowSize).setProducerMaxRate(producerMaxRate).setBlockOnAcknowledge(blockOnAcknowledge).setBlockOnDurableSend(blockOnDurableSend).setBlockOnNonDurableSend(blockOnNonDurableSend).setAutoGroup(autoGroup).setPreAcknowledge(preAcknowledge).setTransactionBatchSize(transactionBatchSize).setDupsOKBatchSize(dupsOKBatchSize).setUseGlobalPools(useGlobalPools).setScheduledThreadPoolM axSize(scheduledThreadPoolMaxSize).setThreadPoolMaxSize(threadPoolMaxSize).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setMaxRetryInterval(maxRetryInterval).setReconnectAttempts(reconnectAttempts).setFailoverOnInitialConnection(failoverOnInitialConnection).setGroupID(groupId); - - if (useDiscovery) { - configuration.setDiscoveryGroupName(connectorNames[0]); - } else { - ArrayList<String> connectorNamesList = new ArrayList<>(); - for (String nameC : connectorNames) { - connectorNamesList.add(nameC); - } - configuration.setConnectorNames(connectorNamesList); - } - - if (loadBalancingPolicyClassName != null && !loadBalancingPolicyClassName.trim().equals("")) { - configuration.setLoadBalancingPolicyClassName(loadBalancingPolicyClassName); - } - - server.createConnectionFactory(true, configuration, bindings); - } finally { - blockOnIO(); - } - } - - /** - * Create a JMS ConnectionFactory with the specified name connected to a single live-backup pair of servers. - * <br> - * The ConnectionFactory is bound to the Registry for all the specified bindings Strings. - */ - @Override - public void createConnectionFactory(String name, - boolean ha, - boolean useDiscovery, - int cfType, - String connectors, - String bindings) throws Exception { - createConnectionFactory(name, ha, useDiscovery, cfType, toArray(connectors), toArray(bindings)); - } - - @Override - public boolean createQueue(final String name) throws Exception { - return createQueue(name, null, null, true); - } - - @Override - public boolean createQueue(final String name, final String bindings) throws Exception { - return createQueue(name, bindings, null, true); - } - - @Override - public boolean createQueue(String name, String bindings, String selector) throws Exception { - return createQueue(name, bindings, selector, true); - } - - @Override - public boolean createQueue(@Parameter(name = "name", desc = "Name of the queue to create") String name, - @Parameter(name = "bindings", desc = "comma-separated list of Registry bindings (use ',' if u need to use commas in your bindings name)") String bindings, - @Parameter(name = "selector", desc = "the jms selector") String selector, - @Parameter(name = "durable", desc = "is the queue persistent and resilient to restart") boolean durable) throws Exception { - checkStarted(); - - clearIO(); - - try { - return server.createQueue(true, name, selector, durable, JMSServerControlImpl.toArray(bindings)); - } finally { - blockOnIO(); - } - } - - @Override - public boolean destroyQueue(final String name) throws Exception { - return destroyQueue(name, false); - } - - @Override - public boolean destroyQueue(final String name, final boolean removeConsumers) throws Exception { - checkStarted(); - - clearIO(); - - try { - return server.destroyQueue(name, removeConsumers); - } finally { - blockOnIO(); - } - } - - @Override - public boolean createTopic(String name) throws Exception { - return createTopic(name, null); - } - - @Override - public boolean createTopic(final String topicName, final String bindings) throws Exception { - checkStarted(); - - clearIO(); - - try { - return server.createTopic(true, topicName, JMSServerControlImpl.toArray(bindings)); - } finally { - blockOnIO(); - } - } - - @Override - public boolean destroyTopic(final String name) throws Exception { - return destroyTopic(name, true); - } - - @Override - public boolean destroyTopic(final String name, final boolean removeConsumers) throws Exception { - checkStarted(); - - clearIO(); - - try { - return server.destroyTopic(name, removeConsumers); - } finally { - blockOnIO(); - } - } - - @Override - public void destroyConnectionFactory(final String name) throws Exception { - checkStarted(); - - clearIO(); - - try { - server.destroyConnectionFactory(name); - } finally { - blockOnIO(); - } - } - - @Override - public boolean isStarted() { - return server.isStarted(); - } - - @Override - public String getVersion() { - checkStarted(); - - return server.getVersion(); - } - - @Override - public String[] getQueueNames() { - checkStarted(); - - clearIO(); - - try { - Object[] queueControls = server.getActiveMQServer().getManagementService().getResources(JMSQueueControl.class); - String[] names = new String[queueControls.length]; - for (int i = 0; i < queueControls.length; i++) { - JMSQueueControl queueControl = (JMSQueueControl) queueControls[i]; - names[i] = queueControl.getName(); - } - return names; - } finally { - blockOnIO(); - } - } - - @Override - public String[] getTopicNames() { - checkStarted(); - - clearIO(); - - try { - Object[] topicControls = server.getActiveMQServer().getManagementService().getResources(TopicControl.class); - String[] names = new String[topicControls.length]; - for (int i = 0; i < topicControls.length; i++) { - TopicControl topicControl = (TopicControl) topicControls[i]; - names[i] = topicControl.getName(); - } - return names; - } finally { - blockOnIO(); - } - } - - @Override - public String[] getConnectionFactoryNames() { - checkStarted(); - - clearIO(); - - try { - Object[] cfControls = server.getActiveMQServer().getManagementService().getResources(ConnectionFactoryControl.class); - String[] names = new String[cfControls.length]; - for (int i = 0; i < cfControls.length; i++) { - ConnectionFactoryControl cfControl = (ConnectionFactoryControl) cfControls[i]; - names[i] = cfControl.getName(); - } - return names; - } finally { - blockOnIO(); - } - } - - @Override - public String getNodeID() { - return server.getActiveMQServer().getNodeID().toString(); - } - - // NotificationEmitter implementation ---------------------------- - - @Override - public void removeNotificationListener(final NotificationListener listener, - final NotificationFilter filter, - final Object handback) throws ListenerNotFoundException { - broadcaster.removeNotificationListener(listener, filter, handback); - } - - @Override - public void removeNotificationListener(final NotificationListener listener) throws ListenerNotFoundException { - broadcaster.removeNotificationListener(listener); - } - - @Override - public void addNotificationListener(final NotificationListener listener, - final NotificationFilter filter, - final Object handback) throws IllegalArgumentException { - broadcaster.addNotificationListener(listener, filter, handback); - } - - @Override - public MBeanNotificationInfo[] getNotificationInfo() { - return JMSServerControlImpl.getNotificationInfos(); - } - - @Override - public String[] listRemoteAddresses() throws Exception { - checkStarted(); - - clearIO(); - - try { - return server.listRemoteAddresses(); - } finally { - blockOnIO(); - } - } - - @Override - public String[] listRemoteAddresses(final String ipAddress) throws Exception { - checkStarted(); - - clearIO(); - - try { - return server.listRemoteAddresses(ipAddress); - } finally { - blockOnIO(); - } - } - - @Override - public boolean closeConnectionsForAddress(final String ipAddress) throws Exception { - checkStarted(); - - clearIO(); - - try { - return server.closeConnectionsForAddress(ipAddress); - } finally { - blockOnIO(); - } - } - - @Override - public boolean closeConsumerConnectionsForAddress(final String address) throws Exception { - checkStarted(); - - clearIO(); - - try { - return server.closeConsumerConnectionsForAddress(address); - } finally { - blockOnIO(); - } - } - - @Override - public boolean closeConnectionsForUser(final String userName) throws Exception { - checkStarted(); - - clearIO(); - - try { - return server.closeConnectionsForUser(userName); - } finally { - blockOnIO(); - } - } - - @Override - public String[] listConnectionIDs() throws Exception { - checkStarted(); - - clearIO(); - - try { - return server.listConnectionIDs(); - } finally { - blockOnIO(); - } - } - - @Override - public String listConnectionsAsJSON() throws Exception { - checkStarted(); - - clearIO(); - - try { - JsonArrayBuilder array = JsonLoader.createArrayBuilder(); - - Set<RemotingConnection> connections = server.getActiveMQServer().getRemotingService().getConnections(); - - Set<ServerSession> sessions = server.getActiveMQServer().getSessions(); - - Map<Object, ServerSession> jmsSessions = new HashMap<>(); - - // First separate the real jms sessions, after all we are only interested in those here on the *jms* server controller - for (ServerSession session : sessions) { - if (session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null) { - jmsSessions.put(session.getConnectionID(), session); - } - } - - for (RemotingConnection connection : connections) { - ServerSession session = jmsSessions.get(connection.getID()); - if (session != null) { - JsonObjectBuilder objectBuilder = JsonLoader.createObjectBuilder().add("connectionID", connection.getID().toString()).add("clientAddress", connection.getRemoteAddress()).add("creationTime", connection.getCreationTime()); - - if (session.getMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY) != null) { - objectBuilder.add("clientID", session.getMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY)); - } - - if (session.getUsername() != null) { - objectBuilder.add("principal", session.getUsername()); - } - - array.add(objectBuilder.build()); - } - } - return array.build().toString(); - } finally { - blockOnIO(); - } - } - - @Override - public String listConsumersAsJSON(String connectionID) throws Exception { - checkStarted(); - - clearIO(); - - try { - JsonArrayBuilder array = JsonLoader.createArrayBuilder(); - - Set<RemotingConnection> connections = server.getActiveMQServer().getRemotingService().getConnections(); - for (RemotingConnection connection : connections) { - if (connectionID.equals(connection.getID().toString())) { - List<ServerSession> sessions = server.getActiveMQServer().getSessions(connectionID); - for (ServerSession session : sessions) { - Set<ServerConsumer> consumers = session.getServerConsumers(); - for (ServerConsumer consumer : consumers) { - JsonObject obj = toJSONObject(consumer); - if (obj != null) { - array.add(obj); - } - } - } - } - } - return array.build().toString(); - } finally { - blockOnIO(); - } - } - - @Override - public String listAllConsumersAsJSON() throws Exception { - checkStarted(); - - clearIO(); - - try { - JsonArray jsonArray = toJsonArray(server.getActiveMQServer().getSessions()); - return jsonArray.toString(); - } finally { - blockOnIO(); - } - } - - @Override - public String[] listSessions(final String connectionID) throws Exception { - checkStarted(); - - clearIO(); - - try { - return server.listSessions(connectionID); - } finally { - blockOnIO(); - } - } - - @Override - public String listPreparedTransactionDetailsAsJSON() throws Exception { - checkStarted(); - - clearIO(); - - try { - return server.listPreparedTransactionDetailsAsJSON(); - } finally { - blockOnIO(); - } - } - - @Override - public String listPreparedTransactionDetailsAsHTML() throws Exception { - checkStarted(); - - clearIO(); - - try { - return server.listPreparedTransactionDetailsAsHTML(); - } finally { - blockOnIO(); - } - } - - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.management.impl.AbstractControl#fillMBeanOperationInfo() - */ - @Override - protected MBeanOperationInfo[] fillMBeanOperationInfo() { - return MBeanInfoHelper.getMBeanOperationsInfo(JMSServerControl.class); - } - - @Override - protected MBeanAttributeInfo[] fillMBeanAttributeInfo() { - return MBeanInfoHelper.getMBeanAttributesInfo(JMSServerControl.class); - } - - // Private ------------------------------------------------------- - - private void checkStarted() { - if (!server.isStarted()) { - throw new IllegalStateException("ActiveMQ Artemis JMS Server is not started. It can not be managed yet"); - } - } - - // Inner classes ------------------------------------------------- - - @Override - public String[] listTargetDestinations(String sessionID) throws Exception { - String[] addresses = server.getActiveMQServer().getActiveMQServerControl().listTargetAddresses(sessionID); - Map<String, DestinationControl> allDests = new HashMap<>(); - - Object[] queueControls = server.getActiveMQServer().getManagementService().getResources(JMSQueueControl.class); - for (Object queueControl2 : queueControls) { - JMSQueueControl queueControl = (JMSQueueControl) queueControl2; - allDests.put(queueControl.getAddress(), queueControl); - } - - Object[] topicControls = server.getActiveMQServer().getManagementService().getResources(TopicControl.class); - for (Object topicControl2 : topicControls) { - TopicControl topicControl = (TopicControl) topicControl2; - allDests.put(topicControl.getAddress(), topicControl); - } - - List<String> destinations = new ArrayList<>(); - for (String addresse : addresses) { - DestinationControl control = allDests.get(addresse); - if (control != null) { - destinations.add(control.getAddress()); - } - } - return destinations.toArray(new String[destinations.size()]); - } - - @Override - public String getLastSentMessageID(String sessionID, String address) throws Exception { - ServerSession session = server.getActiveMQServer().getSessionByID(sessionID); - if (session != null) { - return session.getLastSentMessageID(address); - } - return null; - } - - @Override - public String getSessionCreationTime(String sessionID) throws Exception { - ServerSession session = server.getActiveMQServer().getSessionByID(sessionID); - if (session != null) { - return String.valueOf(session.getCreationTime()); - } - return null; - } - - @Override - public String listSessionsAsJSON(final String connectionID) throws Exception { - checkStarted(); - - clearIO(); - - try { - return server.listSessionsAsJSON(connectionID); - } finally { - blockOnIO(); - } - } - - @Override - public String listNetworkTopology() throws Exception { - checkStarted(); - - clearIO(); - try { - JsonArrayBuilder brokers = JsonLoader.createArrayBuilder(); - ClusterManager clusterManager = server.getActiveMQServer().getClusterManager(); - if (clusterManager != null) { - Set<ClusterConnection> clusterConnections = clusterManager.getClusterConnections(); - for (ClusterConnection clusterConnection : clusterConnections) { - Topology topology = clusterConnection.getTopology(); - Collection<TopologyMemberImpl> members = topology.getMembers(); - for (TopologyMemberImpl member : members) { - - JsonObjectBuilder obj = JsonLoader.createObjectBuilder(); - TransportConfiguration live = member.getLive(); - if (live != null) { - obj.add("nodeID", member.getNodeId()).add("live", live.getParams().get("host") + ":" + live.getParams().get("port")); - TransportConfiguration backup = member.getBackup(); - if (backup != null) { - obj.add("backup", backup.getParams().get("host") + ":" + backup.getParams().get("port")); - } - } - brokers.add(obj); - } - } - } - return brokers.build().toString(); - } finally { - blockOnIO(); - } - } - - @Override - public String closeConnectionWithClientID(final String clientID) throws Exception { - return server.getActiveMQServer().destroyConnectionWithSessionMetadata(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY, clientID); - } - - private String determineJMSDestinationType(Queue queue) { - String result; - if (server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(queue.getAddress().toString())).getRoutingType() == AddressInfo.RoutingType.ANYCAST) { - if (queue.isTemporary()) { - result = "tempqueue"; - } else { - result = "queue"; - } - } else if (server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(queue.getAddress().toString())).getRoutingType() == AddressInfo.RoutingType.MULTICAST) { - if (queue.isTemporary()) { - result = "temptopic"; - } else { - result = "topic"; - } - } else { - ActiveMQJMSServerLogger.LOGGER.debug("JMSServerControlImpl.determineJMSDestinationType() " + queue); - // not related to JMS - return null; - } - return result; - } - - private JsonObject toJSONObject(ServerConsumer consumer) { - AddressInfo addressInfo = server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(consumer.getQueue().getAddress().toString())); - if (addressInfo == null) { - return null; - } - JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("consumerID", consumer.getID()).add("connectionID", consumer.getConnectionID().toString()).add("sessionID", consumer.getSessionID()).add("queueName", consumer.getQueue().getName().toString()).add("browseOnly", consumer.isBrowseOnly()).add("creationTime", consumer.getCreationTime()).add("destinationName", consumer.getQueue().getAddress().toString()).add("destinationType", determineJMSDestinationType(consumer.getQueue())); - // JMS consumer with message filter use the queue's filter - Filter queueFilter = consumer.getQueue().getFilter(); - if (queueFilter != null) { - obj.add("filter", queueFilter.getFilterString().toString()); - } - - if (addressInfo.getRoutingType().equals(AddressInfo.RoutingType.MULTICAST)) { - if (consumer.getQueue().isTemporary()) { - obj.add("durable", false); - } else { - obj.add("durable", true); - } - } else { - obj.add("durable", false); - } - - return obj.build(); - } - - @Override - public void onNotification(org.apache.activemq.artemis.core.server.management.Notification notification) { - if (!(notification.getType() instanceof JMSNotificationType)) - return; - JMSNotificationType type = (JMSNotificationType) notification.getType(); - TypedProperties prop = notification.getProperties(); - - this.broadcaster.sendNotification(new Notification(type.toString(), this, notifSeq.incrementAndGet(), prop.getSimpleStringProperty(JMSNotificationType.MESSAGE).toString())); - } - - private JsonArray toJsonArray(Collection<ServerSession> sessions) { - JsonArrayBuilder array = JsonLoader.createArrayBuilder(); - - for (ServerSession session : sessions) { - Set<ServerConsumer> consumers = session.getServerConsumers(); - for (ServerConsumer consumer : consumers) { - JsonObject obj = toJSONObject(consumer); - if (obj != null) { - array.add(obj); - } - } - } - return array.build(); - } - -}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0189f156/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSTopicControlImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSTopicControlImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSTopicControlImpl.java deleted file mode 100644 index d215b59..0000000 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSTopicControlImpl.java +++ /dev/null @@ -1,370 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.jms.management.impl; - -import javax.json.JsonArrayBuilder; -import javax.json.JsonObject; -import javax.management.MBeanInfo; -import javax.management.StandardMBean; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.api.core.Pair; -import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl; -import org.apache.activemq.artemis.api.core.management.AddressControl; -import org.apache.activemq.artemis.api.core.management.QueueControl; -import org.apache.activemq.artemis.api.core.management.ResourceNames; -import org.apache.activemq.artemis.api.jms.management.TopicControl; -import org.apache.activemq.artemis.core.management.impl.MBeanInfoHelper; -import org.apache.activemq.artemis.core.server.management.ManagementService; -import org.apache.activemq.artemis.jms.client.ActiveMQDestination; -import org.apache.activemq.artemis.jms.client.ActiveMQMessage; -import org.apache.activemq.artemis.jms.server.JMSServerManager; -import org.apache.activemq.artemis.utils.JsonLoader; -import org.apache.activemq.artemis.utils.SelectorTranslator; -import org.jboss.logging.Logger; - -import static org.apache.activemq.artemis.api.core.JsonUtil.nullSafe; - -public class JMSTopicControlImpl extends StandardMBean implements TopicControl { - - private final ActiveMQDestination managedTopic; - - private final AddressControl addressControl; - - private final ManagementService managementService; - - private final JMSServerManager jmsServerManager; - - private static final Logger logger = Logger.getLogger(JMSTopicControlImpl.class); - - // Static -------------------------------------------------------- - - public static String createFilterFromJMSSelector(final String selectorStr) throws ActiveMQException { - return selectorStr == null || selectorStr.trim().length() == 0 ? null : SelectorTranslator.convertToActiveMQFilterString(selectorStr); - } - - // Constructors -------------------------------------------------- - - public JMSTopicControlImpl(final ActiveMQDestination topic, - final JMSServerManager jmsServerManager, - final AddressControl addressControl, - final ManagementService managementService) throws Exception { - super(TopicControl.class); - this.jmsServerManager = jmsServerManager; - managedTopic = topic; - this.addressControl = addressControl; - this.managementService = managementService; - } - - // TopicControlMBean implementation ------------------------------ - - @Override - public void addBinding(String binding) throws Exception { - jmsServerManager.addTopicToBindingRegistry(managedTopic.getName(), binding); - } - - @Override - public String[] getRegistryBindings() { - return jmsServerManager.getBindingsOnTopic(managedTopic.getName()); - } - - @Override - public String getName() { - return managedTopic.getName(); - } - - @Override - public boolean isTemporary() { - return managedTopic.isTemporary(); - } - - @Override - public String getAddress() { - return managedTopic.getAddress(); - } - - @Override - public long getMessageCount() { - return getMessageCount(DurabilityType.ALL); - } - - @Override - public int getDeliveringCount() { - List<QueueControl> queues = getQueues(DurabilityType.ALL); - int count = 0; - for (QueueControl queue : queues) { - count += queue.getDeliveringCount(); - } - return count; - } - - @Override - public long getMessagesAdded() { - List<QueueControl> queues = getQueues(DurabilityType.ALL); - int count = 0; - for (QueueControl queue : queues) { - count += queue.getMessagesAdded(); - } - return count; - } - - @Override - public int getDurableMessageCount() { - return getMessageCount(DurabilityType.DURABLE); - } - - @Override - public int getNonDurableMessageCount() { - return getMessageCount(DurabilityType.NON_DURABLE); - } - - @Override - public int getSubscriptionCount() { - return getQueues(DurabilityType.ALL).size(); - } - - @Override - public int getDurableSubscriptionCount() { - return getQueues(DurabilityType.DURABLE).size(); - } - - @Override - public int getNonDurableSubscriptionCount() { - return getQueues(DurabilityType.NON_DURABLE).size(); - } - - @Override - public Object[] listAllSubscriptions() { - return listSubscribersInfos(DurabilityType.ALL); - } - - @Override - public String listAllSubscriptionsAsJSON() throws Exception { - return listSubscribersInfosAsJSON(DurabilityType.ALL); - } - - @Override - public Object[] listDurableSubscriptions() { - return listSubscribersInfos(DurabilityType.DURABLE); - } - - @Override - public String listDurableSubscriptionsAsJSON() throws Exception { - return listSubscribersInfosAsJSON(DurabilityType.DURABLE); - } - - @Override - public Object[] listNonDurableSubscriptions() { - return listSubscribersInfos(DurabilityType.NON_DURABLE); - } - - @Override - public String listNonDurableSubscriptionsAsJSON() throws Exception { - return listSubscribersInfosAsJSON(DurabilityType.NON_DURABLE); - } - - @Override - public Map<String, Object>[] listMessagesForSubscription(final String queueName) throws Exception { - QueueControl coreQueueControl = (QueueControl) managementService.getResource(ResourceNames.CORE_QUEUE + queueName); - if (coreQueueControl == null) { - throw new IllegalArgumentException("No subscriptions with name " + queueName); - } - - Map<String, Object>[] coreMessages = coreQueueControl.listMessages(null); - - Map<String, Object>[] jmsMessages = new Map[coreMessages.length]; - - int i = 0; - - for (Map<String, Object> coreMessage : coreMessages) { - jmsMessages[i++] = ActiveMQMessage.coreMaptoJMSMap(coreMessage); - } - return jmsMessages; - } - - @Override - public String listMessagesForSubscriptionAsJSON(final String queueName) throws Exception { - return JMSQueueControlImpl.toJSON(listMessagesForSubscription(queueName)); - } - - @Override - public int countMessagesForSubscription(final String clientID, - final String subscriptionName, - final String filterStr) throws Exception { - String queueName = ActiveMQDestination.createQueueNameForDurableSubscription(true, clientID, subscriptionName); - QueueControl coreQueueControl = (QueueControl) managementService.getResource(ResourceNames.CORE_QUEUE + queueName); - if (coreQueueControl == null) { - throw new IllegalArgumentException("No subscriptions with name " + queueName + " for clientID " + clientID); - } - String filter = JMSTopicControlImpl.createFilterFromJMSSelector(filterStr); - return coreQueueControl.listMessages(filter).length; - } - - @Override - public int removeMessages(final String filterStr) throws Exception { - String filter = JMSTopicControlImpl.createFilterFromJMSSelector(filterStr); - int count = 0; - String[] queues = addressControl.getQueueNames(); - for (String queue : queues) { - QueueControl coreQueueControl = (QueueControl) managementService.getResource(ResourceNames.CORE_QUEUE + queue); - if (coreQueueControl != null) { - count += coreQueueControl.removeMessages(filter); - } - } - - return count; - } - - @Override - public void dropDurableSubscription(final String clientID, final String subscriptionName) throws Exception { - String queueName = ActiveMQDestination.createQueueNameForDurableSubscription(true, clientID, subscriptionName); - QueueControl coreQueueControl = (QueueControl) managementService.getResource(ResourceNames.CORE_QUEUE + queueName); - if (coreQueueControl == null) { - throw new IllegalArgumentException("No subscriptions with name " + queueName + " for clientID " + clientID); - } - ActiveMQServerControl serverControl = (ActiveMQServerControl) managementService.getResource(ResourceNames.CORE_SERVER); - serverControl.destroyQueue(queueName, true); - } - - @Override - public void dropAllSubscriptions() throws Exception { - ActiveMQServerControl serverControl = (ActiveMQServerControl) managementService.getResource(ResourceNames.CORE_SERVER); - String[] queues = addressControl.getQueueNames(); - for (String queue : queues) { - // Drop all subscription shouldn't delete the dummy queue used to identify if the topic exists on the core queues. - // we will just ignore this queue - if (!queue.equals(managedTopic.getAddress())) { - serverControl.destroyQueue(queue); - } - } - } - - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - - // Private ------------------------------------------------------- - - private Object[] listSubscribersInfos(final DurabilityType durability) { - List<QueueControl> queues = getQueues(durability); - List<Object[]> subInfos = new ArrayList<>(queues.size()); - - for (QueueControl queue : queues) { - String clientID = null; - String subName = null; - - if (queue.isDurable()) { - Pair<String, String> pair = ActiveMQDestination.decomposeQueueNameForDurableSubscription(queue.getName()); - clientID = pair.getA(); - subName = pair.getB(); - } - - String filter = queue.getFilter() != null ? queue.getFilter() : null; - - Object[] subscriptionInfo = new Object[6]; - subscriptionInfo[0] = queue.getName(); - subscriptionInfo[1] = clientID; - subscriptionInfo[2] = subName; - subscriptionInfo[3] = queue.isDurable(); - subscriptionInfo[4] = queue.getMessageCount(); - subscriptionInfo[5] = filter; - subInfos.add(subscriptionInfo); - } - return subInfos.toArray(new Object[subInfos.size()]); - } - - private String listSubscribersInfosAsJSON(final DurabilityType durability) throws Exception { - try { - List<QueueControl> queues = getQueues(durability); - JsonArrayBuilder array = JsonLoader.createArrayBuilder(); - - for (QueueControl queue : queues) { - String clientID = null; - String subName = null; - - if (queue.isDurable()) { - Pair<String, String> pair = ActiveMQDestination.decomposeQueueNameForDurableSubscription(queue.getName()); - clientID = pair.getA(); - subName = pair.getB(); - } else { - // in the case of heirarchical topics the queue name will not follow the <part>.<part> pattern of normal - // durable subscribers so skip decomposing the name for the client ID and subscription name and just - // hard-code it - clientID = ""; - subName = ""; - } - - String filter = queue.getFilter() != null ? queue.getFilter() : null; - - JsonObject info = JsonLoader.createObjectBuilder().add("queueName", queue.getName()).add("clientID", nullSafe(clientID)).add("selector", nullSafe(filter)).add("name", nullSafe(subName)).add("durable", queue.isDurable()).add("messageCount", queue.getMessageCount()).add("deliveringCount", queue.getDeliveringCount()).add("consumers", queue.listConsumersAsJSON()).build(); - - array.add(info); - } - - return array.build().toString(); - } catch (Exception e) { - logger.warn("Unable to list subscribers as JSON", e.getMessage(), e); - return e.toString(); - } - } - - private int getMessageCount(final DurabilityType durability) { - List<QueueControl> queues = getQueues(durability); - int count = 0; - for (QueueControl queue : queues) { - count += queue.getMessageCount(); - } - return count; - } - - private List<QueueControl> getQueues(final DurabilityType durability) { - try { - List<QueueControl> matchingQueues = new ArrayList<>(); - String[] queues = addressControl.getQueueNames(); - for (String queue : queues) { - QueueControl coreQueueControl = (QueueControl) managementService.getResource(ResourceNames.CORE_QUEUE + queue); - - // Ignore the "special" subscription - if (coreQueueControl != null && !coreQueueControl.getName().equals(addressControl.getAddress())) { - if (durability == DurabilityType.ALL || durability == DurabilityType.DURABLE && coreQueueControl.isDurable() || - durability == DurabilityType.NON_DURABLE && !coreQueueControl.isDurable()) { - matchingQueues.add(coreQueueControl); - } - } - } - return matchingQueues; - } catch (Exception e) { - return Collections.emptyList(); - } - } - - @Override - public MBeanInfo getMBeanInfo() { - MBeanInfo info = super.getMBeanInfo(); - return new MBeanInfo(info.getClassName(), info.getDescription(), MBeanInfoHelper.getMBeanAttributesInfo(TopicControl.class), info.getConstructors(), MBeanInfoHelper.getMBeanOperationsInfo(TopicControl.class), info.getNotifications()); - } - - // Inner classes ------------------------------------------------- - - private enum DurabilityType { - ALL, DURABLE, NON_DURABLE - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0189f156/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/openmbean/JMSCompositeDataConstants.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/openmbean/JMSCompositeDataConstants.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/openmbean/JMSCompositeDataConstants.java deleted file mode 100644 index dc5b33b..0000000 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/openmbean/JMSCompositeDataConstants.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.jms.management.impl.openmbean; - -public interface JMSCompositeDataConstants { - - String JMS_DESTINATION = "JMSDestination"; - String JMS_MESSAGE_ID = "JMSMessageID"; - String JMS_TYPE = "JMSType"; - String JMS_DELIVERY_MODE = "JMSDeliveryMode"; - String JMS_EXPIRATION = "JMSExpiration"; - String JMS_PRIORITY = "JMSPriority"; - String JMS_REDELIVERED = "JMSRedelivered"; - String JMS_TIMESTAMP = "JMSTimestamp"; - String JMSXGROUP_SEQ = "JMSXGroupSeq"; - String JMSXGROUP_ID = "JMSXGroupID"; - String JMSXUSER_ID = "JMSXUserID"; - String JMS_CORRELATION_ID = "JMSCorrelationID"; - String ORIGINAL_DESTINATION = "OriginalDestination"; - String JMS_REPLY_TO = "JMSReplyTo"; - - String JMS_DESTINATION_DESCRIPTION = "The message destination"; - String JMS_MESSAGE_ID_DESCRIPTION = "The message ID"; - String JMS_TYPE_DESCRIPTION = "The message type"; - String JMS_DELIVERY_MODE_DESCRIPTION = "The message delivery mode"; - String JMS_EXPIRATION_DESCRIPTION = "The message expiration"; - String JMS_PRIORITY_DESCRIPTION = "The message priority"; - String JMS_REDELIVERED_DESCRIPTION = "Is the message redelivered"; - String JMS_TIMESTAMP_DESCRIPTION = "The message timestamp"; - String JMSXGROUP_SEQ_DESCRIPTION = "The message group sequence number"; - String JMSXGROUP_ID_DESCRIPTION = "The message group ID"; - String JMSXUSER_ID_DESCRIPTION = "The user that sent the message"; - String JMS_CORRELATION_ID_DESCRIPTION = "The message correlation ID"; - String ORIGINAL_DESTINATION_DESCRIPTION = "Original Destination Before Senting To DLQ"; - String JMS_REPLY_TO_DESCRIPTION = "The reply to address"; - - String BODY_LENGTH = "BodyLength"; - String BODY_PREVIEW = "BodyPreview"; - String CONTENT_MAP = "ContentMap"; - String MESSAGE_TEXT = "Text"; - String MESSAGE_URL = "Url"; - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0189f156/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/openmbean/JMSOpenTypeSupport.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/openmbean/JMSOpenTypeSupport.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/openmbean/JMSOpenTypeSupport.java deleted file mode 100644 index 285657d..0000000 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/openmbean/JMSOpenTypeSupport.java +++ /dev/null @@ -1,357 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.jms.management.impl.openmbean; - -import javax.management.openmbean.ArrayType; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.CompositeDataSupport; -import javax.management.openmbean.CompositeType; -import javax.management.openmbean.OpenDataException; -import javax.management.openmbean.OpenType; -import javax.management.openmbean.SimpleType; -import javax.management.openmbean.TabularDataSupport; -import javax.management.openmbean.TabularType; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQBuffers; -import org.apache.activemq.artemis.api.core.Message; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants; -import org.apache.activemq.artemis.reader.MapMessageUtil; -import org.apache.activemq.artemis.utils.TypedProperties; - -public final class JMSOpenTypeSupport { - - public interface OpenTypeFactory { - - CompositeType getCompositeType() throws OpenDataException; - - Map<String, Object> getFields(CompositeDataSupport data) throws OpenDataException; - } - - private static final Map<Byte, AbstractOpenTypeFactory> OPEN_TYPE_FACTORIES = new HashMap<>(); - - public abstract static class AbstractOpenTypeFactory implements OpenTypeFactory { - - private CompositeType compositeType; - private final List<String> itemNamesList = new ArrayList<>(); - private final List<String> itemDescriptionsList = new ArrayList<>(); - private final List<OpenType> itemTypesList = new ArrayList<>(); - - @Override - public CompositeType getCompositeType() throws OpenDataException { - if (compositeType == null) { - init(); - compositeType = createCompositeType(); - } - return compositeType; - } - - protected void init() throws OpenDataException { - } - - protected CompositeType createCompositeType() throws OpenDataException { - String[] itemNames = itemNamesList.toArray(new String[itemNamesList.size()]); - String[] itemDescriptions = itemDescriptionsList.toArray(new String[itemDescriptionsList.size()]); - OpenType[] itemTypes = itemTypesList.toArray(new OpenType[itemTypesList.size()]); - return new CompositeType(getTypeName(), getDescription(), itemNames, itemDescriptions, itemTypes); - } - - protected abstract String getTypeName(); - - protected void addItem(String name, String description, OpenType type) { - itemNamesList.add(name); - itemDescriptionsList.add(description); - itemTypesList.add(type); - } - - protected String getDescription() { - return getTypeName(); - } - - @Override - public Map<String, Object> getFields(CompositeDataSupport data) throws OpenDataException { - Map<String, Object> rc = new HashMap<>(); - return rc; - } - } - - static class MessageOpenTypeFactory extends AbstractOpenTypeFactory { - - protected TabularType stringPropertyTabularType; - protected TabularType booleanPropertyTabularType; - protected TabularType bytePropertyTabularType; - protected TabularType shortPropertyTabularType; - protected TabularType intPropertyTabularType; - protected TabularType longPropertyTabularType; - protected TabularType floatPropertyTabularType; - protected TabularType doublePropertyTabularType; - - protected ArrayType body; - - @Override - protected String getTypeName() { - return Message.class.getName(); - } - - @Override - protected void init() throws OpenDataException { - super.init(); - - addItem(JMSCompositeDataConstants.JMS_DESTINATION, JMSCompositeDataConstants.JMS_DESTINATION_DESCRIPTION, SimpleType.STRING); - addItem(JMSCompositeDataConstants.JMS_MESSAGE_ID, JMSCompositeDataConstants.JMS_MESSAGE_ID_DESCRIPTION, SimpleType.STRING); - addItem(JMSCompositeDataConstants.JMS_CORRELATION_ID, JMSCompositeDataConstants.JMS_CORRELATION_ID_DESCRIPTION, SimpleType.STRING); - addItem(JMSCompositeDataConstants.JMS_TYPE, JMSCompositeDataConstants.JMS_TYPE_DESCRIPTION, SimpleType.STRING); - addItem(JMSCompositeDataConstants.JMS_DELIVERY_MODE, JMSCompositeDataConstants.JMS_DELIVERY_MODE_DESCRIPTION, SimpleType.STRING); - addItem(JMSCompositeDataConstants.JMS_EXPIRATION, JMSCompositeDataConstants.JMS_EXPIRATION_DESCRIPTION, SimpleType.LONG); - addItem(JMSCompositeDataConstants.JMS_PRIORITY, JMSCompositeDataConstants.JMS_PRIORITY_DESCRIPTION, SimpleType.INTEGER); - addItem(JMSCompositeDataConstants.JMS_REDELIVERED, JMSCompositeDataConstants.JMS_REDELIVERED_DESCRIPTION, SimpleType.BOOLEAN); - addItem(JMSCompositeDataConstants.JMS_TIMESTAMP, JMSCompositeDataConstants.JMS_TIMESTAMP_DESCRIPTION, SimpleType.DATE); - addItem(JMSCompositeDataConstants.JMSXGROUP_ID, JMSCompositeDataConstants.JMSXGROUP_ID_DESCRIPTION, SimpleType.STRING); - addItem(JMSCompositeDataConstants.JMSXGROUP_SEQ, JMSCompositeDataConstants.JMSXGROUP_SEQ_DESCRIPTION, SimpleType.INTEGER); - addItem(JMSCompositeDataConstants.JMSXUSER_ID, JMSCompositeDataConstants.JMSXUSER_ID_DESCRIPTION, SimpleType.STRING); - addItem(JMSCompositeDataConstants.JMS_REPLY_TO, JMSCompositeDataConstants.JMS_REPLY_TO_DESCRIPTION, SimpleType.STRING); - addItem(JMSCompositeDataConstants.ORIGINAL_DESTINATION, JMSCompositeDataConstants.ORIGINAL_DESTINATION_DESCRIPTION, SimpleType.STRING); - addItem(CompositeDataConstants.PROPERTIES, CompositeDataConstants.PROPERTIES_DESCRIPTION, SimpleType.STRING); - - // now lets expose the type safe properties - stringPropertyTabularType = createTabularType(String.class, SimpleType.STRING); - booleanPropertyTabularType = createTabularType(Boolean.class, SimpleType.BOOLEAN); - bytePropertyTabularType = createTabularType(Byte.class, SimpleType.BYTE); - shortPropertyTabularType = createTabularType(Short.class, SimpleType.SHORT); - intPropertyTabularType = createTabularType(Integer.class, SimpleType.INTEGER); - longPropertyTabularType = createTabularType(Long.class, SimpleType.LONG); - floatPropertyTabularType = createTabularType(Float.class, SimpleType.FLOAT); - doublePropertyTabularType = createTabularType(Double.class, SimpleType.DOUBLE); - - addItem(CompositeDataConstants.STRING_PROPERTIES, CompositeDataConstants.STRING_PROPERTIES_DESCRIPTION, stringPropertyTabularType); - addItem(CompositeDataConstants.BOOLEAN_PROPERTIES, CompositeDataConstants.BOOLEAN_PROPERTIES_DESCRIPTION, booleanPropertyTabularType); - addItem(CompositeDataConstants.BYTE_PROPERTIES, CompositeDataConstants.BYTE_PROPERTIES_DESCRIPTION, bytePropertyTabularType); - addItem(CompositeDataConstants.SHORT_PROPERTIES, CompositeDataConstants.SHORT_PROPERTIES_DESCRIPTION, shortPropertyTabularType); - addItem(CompositeDataConstants.INT_PROPERTIES, CompositeDataConstants.INT_PROPERTIES_DESCRIPTION, intPropertyTabularType); - addItem(CompositeDataConstants.LONG_PROPERTIES, CompositeDataConstants.LONG_PROPERTIES_DESCRIPTION, longPropertyTabularType); - addItem(CompositeDataConstants.FLOAT_PROPERTIES, CompositeDataConstants.FLOAT_PROPERTIES_DESCRIPTION, floatPropertyTabularType); - addItem(CompositeDataConstants.DOUBLE_PROPERTIES, CompositeDataConstants.DOUBLE_PROPERTIES_DESCRIPTION, doublePropertyTabularType); - } - - @Override - public Map<String, Object> getFields(CompositeDataSupport data) throws OpenDataException { - Map<String, Object> rc = super.getFields(data); - putString(rc, data, JMSCompositeDataConstants.JMS_MESSAGE_ID, CompositeDataConstants.USER_ID); - putString(rc, data, JMSCompositeDataConstants.JMS_DESTINATION, CompositeDataConstants.ADDRESS); - putStringProperty(rc, data, JMSCompositeDataConstants.JMS_REPLY_TO, "JMSReplyTo"); - rc.put(JMSCompositeDataConstants.JMS_TYPE, getType()); - rc.put(JMSCompositeDataConstants.JMS_DELIVERY_MODE, ((Boolean) data.get(CompositeDataConstants.DURABLE)) ? "PERSISTENT" : "NON-PERSISTENT"); - rc.put(JMSCompositeDataConstants.JMS_EXPIRATION, data.get(CompositeDataConstants.EXPIRATION)); - rc.put(JMSCompositeDataConstants.JMS_TIMESTAMP, new Date((Long) data.get(CompositeDataConstants.TIMESTAMP))); - rc.put(JMSCompositeDataConstants.JMS_PRIORITY, ((Byte) data.get(CompositeDataConstants.PRIORITY)).intValue()); - putStringProperty(rc, data, JMSCompositeDataConstants.JMS_CORRELATION_ID, JMSCompositeDataConstants.JMS_CORRELATION_ID); - rc.put(JMSCompositeDataConstants.JMS_REDELIVERED, data.get(CompositeDataConstants.REDELIVERED)); - putStringProperty(rc, data, JMSCompositeDataConstants.JMSXGROUP_ID, Message.HDR_GROUP_ID.toString()); - putIntProperty(rc, data, JMSCompositeDataConstants.JMSXGROUP_SEQ, JMSCompositeDataConstants.JMSXGROUP_SEQ); - putStringProperty(rc, data, JMSCompositeDataConstants.JMSXUSER_ID, Message.HDR_VALIDATED_USER.toString()); - putStringProperty(rc, data, JMSCompositeDataConstants.ORIGINAL_DESTINATION, Message.HDR_ORIGINAL_ADDRESS.toString()); - - rc.put(CompositeDataConstants.PROPERTIES, "" + data.get(CompositeDataConstants.PROPERTIES)); - - rc.put(CompositeDataConstants.STRING_PROPERTIES, data.get(CompositeDataConstants.STRING_PROPERTIES)); - rc.put(CompositeDataConstants.BOOLEAN_PROPERTIES, data.get(CompositeDataConstants.BOOLEAN_PROPERTIES)); - rc.put(CompositeDataConstants.BYTE_PROPERTIES, data.get(CompositeDataConstants.BYTE_PROPERTIES)); - rc.put(CompositeDataConstants.SHORT_PROPERTIES, data.get(CompositeDataConstants.SHORT_PROPERTIES)); - rc.put(CompositeDataConstants.INT_PROPERTIES, data.get(CompositeDataConstants.INT_PROPERTIES)); - rc.put(CompositeDataConstants.LONG_PROPERTIES, data.get(CompositeDataConstants.LONG_PROPERTIES)); - rc.put(CompositeDataConstants.FLOAT_PROPERTIES, data.get(CompositeDataConstants.FLOAT_PROPERTIES)); - rc.put(CompositeDataConstants.DOUBLE_PROPERTIES, data.get(CompositeDataConstants.DOUBLE_PROPERTIES)); - - return rc; - } - - private void putString(Map<String, Object> rc, CompositeDataSupport data, String target, String source) { - String prop = (String) data.get(source); - if (prop != null) { - rc.put(target, prop); - } else { - rc.put(target, ""); - } - } - - private void putStringProperty(Map<String, Object> rc, CompositeDataSupport data, String target, String source) { - TabularDataSupport properties = (TabularDataSupport) data.get(CompositeDataConstants.STRING_PROPERTIES); - Object[] keys = new Object[]{source}; - CompositeDataSupport cds = (CompositeDataSupport) properties.get(keys); - String prop = ""; - if (cds != null && cds.get("value") != null) { - prop = (String) cds.get("value"); - } - rc.put(target, prop); - } - - private void putIntProperty(Map<String, Object> rc, CompositeDataSupport data, String target, String source) { - TabularDataSupport properties = (TabularDataSupport) data.get(CompositeDataConstants.INT_PROPERTIES); - Object[] keys = new Object[]{source}; - CompositeDataSupport cds = (CompositeDataSupport) properties.get(keys); - Integer prop = 0; - if (cds != null && cds.get("value") != null) { - prop = (Integer) cds.get("value"); - } - rc.put(target, prop); - } - - private String getType() { - return "Message"; - } - - protected String toString(Object value) { - if (value == null) { - return null; - } - return value.toString(); - } - - protected <T> TabularType createTabularType(Class<T> type, OpenType openType) throws OpenDataException { - String typeName = "java.util.Map<java.lang.String, " + type.getName() + ">"; - String[] keyValue = new String[]{"key", "value"}; - OpenType[] openTypes = new OpenType[]{SimpleType.STRING, openType}; - CompositeType rowType = new CompositeType(typeName, typeName, keyValue, keyValue, openTypes); - return new TabularType(typeName, typeName, rowType, new String[]{"key"}); - } - } - - static class ByteMessageOpenTypeFactory extends MessageOpenTypeFactory { - - @Override - protected String getTypeName() { - return "BytesMessage"; - } - - @Override - protected void init() throws OpenDataException { - super.init(); - addItem(JMSCompositeDataConstants.BODY_LENGTH, "Body length", SimpleType.LONG); - addItem(JMSCompositeDataConstants.BODY_PREVIEW, "Body preview", new ArrayType(SimpleType.BYTE, true)); - } - - @Override - public Map<String, Object> getFields(CompositeDataSupport data) throws OpenDataException { - Map<String, Object> rc = super.getFields(data); - ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer((byte[]) data.get("body")); - long length = 0; - length = buffer.readableBytes(); - rc.put(JMSCompositeDataConstants.BODY_LENGTH, Long.valueOf(length)); - byte[] preview = new byte[(int) Math.min(length, 255)]; - buffer.readBytes(preview); - rc.put(JMSCompositeDataConstants.BODY_PREVIEW, preview); - return rc; - } - } - - static class MapMessageOpenTypeFactory extends MessageOpenTypeFactory { - - @Override - protected String getTypeName() { - return "MapMessage"; - } - - @Override - protected void init() throws OpenDataException { - super.init(); - addItem(JMSCompositeDataConstants.CONTENT_MAP, "Content map", SimpleType.STRING); - } - - @Override - public Map<String, Object> getFields(CompositeDataSupport data) throws OpenDataException { - Map<String, Object> rc = super.getFields(data); - ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer((byte[]) data.get("body")); - TypedProperties properties = new TypedProperties(); - MapMessageUtil.readBodyMap(buffer, properties); - rc.put(JMSCompositeDataConstants.CONTENT_MAP, "" + properties.getMap()); - return rc; - } - } - - static class ObjectMessageOpenTypeFactory extends MessageOpenTypeFactory { - - @Override - protected String getTypeName() { - return "ObjectMessage"; - } - } - - static class StreamMessageOpenTypeFactory extends MessageOpenTypeFactory { - - @Override - protected String getTypeName() { - return "StreamMessage"; - } - } - - static class TextMessageOpenTypeFactory extends MessageOpenTypeFactory { - - @Override - protected String getTypeName() { - return "TextMessage"; - } - - @Override - protected void init() throws OpenDataException { - super.init(); - addItem(JMSCompositeDataConstants.MESSAGE_TEXT, JMSCompositeDataConstants.MESSAGE_TEXT, SimpleType.STRING); - } - - @Override - public Map<String, Object> getFields(CompositeDataSupport data) throws OpenDataException { - Map<String, Object> rc = super.getFields(data); - ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer((byte[]) data.get("body")); - SimpleString value = buffer.readNullableSimpleString(); - rc.put(JMSCompositeDataConstants.MESSAGE_TEXT, value != null ? value.toString() : ""); - return rc; - } - - } - - static { - OPEN_TYPE_FACTORIES.put(Message.DEFAULT_TYPE, new MessageOpenTypeFactory()); - OPEN_TYPE_FACTORIES.put(Message.TEXT_TYPE, new TextMessageOpenTypeFactory()); - OPEN_TYPE_FACTORIES.put(Message.BYTES_TYPE, new ByteMessageOpenTypeFactory()); - OPEN_TYPE_FACTORIES.put(Message.MAP_TYPE, new MapMessageOpenTypeFactory()); - OPEN_TYPE_FACTORIES.put(Message.OBJECT_TYPE, new ObjectMessageOpenTypeFactory()); - OPEN_TYPE_FACTORIES.put(Message.STREAM_TYPE, new StreamMessageOpenTypeFactory()); - } - - private JMSOpenTypeSupport() { - } - - public static OpenTypeFactory getFactory(Byte type) throws OpenDataException { - return OPEN_TYPE_FACTORIES.get(type); - } - - public static CompositeData convert(CompositeDataSupport data) throws OpenDataException { - OpenTypeFactory f = getFactory((Byte) data.get("type")); - if (f == null) { - throw new OpenDataException("Cannot create a CompositeData for type: " + data.get("type")); - } - CompositeType ct = f.getCompositeType(); - Map<String, Object> fields = f.getFields(data); - return new CompositeDataSupport(ct, fields); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0189f156/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java index 191e117..25ad349 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java @@ -86,9 +86,7 @@ import org.apache.activemq.artemis.jms.server.config.JMSQueueConfiguration; import org.apache.activemq.artemis.jms.server.config.TopicConfiguration; import org.apache.activemq.artemis.jms.server.config.impl.ConnectionFactoryConfigurationImpl; import org.apache.activemq.artemis.jms.server.config.impl.FileJMSConfiguration; -import org.apache.activemq.artemis.jms.server.management.JMSManagementService; import org.apache.activemq.artemis.jms.server.management.JMSNotificationType; -import org.apache.activemq.artemis.jms.server.management.impl.JMSManagementServiceImpl; import org.apache.activemq.artemis.jms.transaction.JMSTransactionDetail; import org.apache.activemq.artemis.spi.core.naming.BindingRegistry; import org.apache.activemq.artemis.utils.JsonLoader; @@ -134,8 +132,6 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback private final ActiveMQServer server; - private JMSManagementService jmsManagementService; - private boolean startCalled; private boolean active; @@ -194,10 +190,6 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback try { - jmsManagementService = new JMSManagementServiceImpl(server.getManagementService(), server, this); - - jmsManagementService.registerJMSServer(this); - // Must be set to active before calling initJournal active = true; @@ -252,15 +244,6 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback topicBindings.clear(); topics.clear(); - // it could be null if a backup - if (jmsManagementService != null) { - jmsManagementService.unregisterJMSServer(); - - jmsManagementService.stop(); - } - - jmsManagementService = null; - active = false; } } catch (Exception e) { @@ -391,7 +374,6 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback // server.setJMSQueueCreator(new JMSDestinationCreator()); // // server.setJMSQueueDeleter(new JMSQueueDeleter()); - server.registerActivateCallback(this); // server.registerPostQueueCreationCallback(new JMSPostQueueCreationCallback()); @@ -803,8 +785,6 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback queues.remove(name); queueBindings.remove(name); - jmsManagementService.unregisterQueue(name); - storage.deleteDestination(PersistedType.Queue, name); sendNotification(JMSNotificationType.QUEUE_DESTROYED, name); @@ -822,7 +802,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback @Override public synchronized boolean destroyTopic(final String name, final boolean removeConsumers) throws Exception { checkInitialised(); - AddressControl addressControl = (AddressControl) server.getManagementService().getResource(ResourceNames.CORE_ADDRESS + name); + AddressControl addressControl = (AddressControl) server.getManagementService().getResource(ResourceNames.ADDRESS + name); if (addressControl != null) { for (String queueName : addressControl.getQueueNames()) { Binding binding = server.getPostOffice().getBinding(new SimpleString(queueName)); @@ -843,8 +823,6 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback topics.remove(name); topicBindings.remove(name); - jmsManagementService.unregisterTopic(name); - storage.deleteDestination(PersistedType.Topic, name); sendNotification(JMSNotificationType.TOPIC_DESTROYED, name); @@ -1100,8 +1078,6 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback this.recoverregistryBindings(queueName, PersistedType.Queue); - jmsManagementService.registerQueue(activeMQQueue, queue); - return true; } } @@ -1136,8 +1112,6 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback this.recoverregistryBindings(topicName, PersistedType.Topic); - jmsManagementService.registerTopic(activeMQTopic); - return true; } } @@ -1157,8 +1131,6 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback connectionFactories.put(cfConfig.getName(), cf); - jmsManagementService.registerConnectionFactory(cfConfig.getName(), cfConfig, cf); - return cf; } @@ -1284,8 +1256,6 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback connectionFactoryBindings.remove(name); connectionFactories.remove(name); - jmsManagementService.unregisterConnectionFactory(name); - return true; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0189f156/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/management/JMSManagementService.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/management/JMSManagementService.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/management/JMSManagementService.java deleted file mode 100644 index ff6c240..0000000 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/management/JMSManagementService.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.jms.server.management; - -import org.apache.activemq.artemis.api.jms.management.JMSServerControl; -import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; -import org.apache.activemq.artemis.jms.client.ActiveMQQueue; -import org.apache.activemq.artemis.jms.client.ActiveMQTopic; -import org.apache.activemq.artemis.jms.server.JMSServerManager; -import org.apache.activemq.artemis.jms.server.config.ConnectionFactoryConfiguration; - -public interface JMSManagementService { - - JMSServerControl registerJMSServer(JMSServerManager server) throws Exception; - - void unregisterJMSServer() throws Exception; - - void registerQueue(ActiveMQQueue queue, Queue serverQueue) throws Exception; - - void unregisterQueue(String name) throws Exception; - - void registerTopic(ActiveMQTopic topic) throws Exception; - - void unregisterTopic(String name) throws Exception; - - void registerConnectionFactory(String name, - ConnectionFactoryConfiguration config, - ActiveMQConnectionFactory connectionFactory) throws Exception; - - void unregisterConnectionFactory(String name) throws Exception; - - void stop() throws Exception; -}
