http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSConnectionFactoryControlImpl.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSConnectionFactoryControlImpl.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSConnectionFactoryControlImpl.java new file mode 100644 index 0000000..74fa619 --- /dev/null +++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSConnectionFactoryControlImpl.java @@ -0,0 +1,471 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.management.impl; + +import javax.management.MBeanInfo; +import javax.management.NotCompliantMBeanException; +import javax.management.StandardMBean; + +import org.apache.activemq6.api.core.DiscoveryGroupConfiguration; +import org.apache.activemq6.api.core.TransportConfiguration; +import org.apache.activemq6.api.core.management.Parameter; +import org.apache.activemq6.api.jms.management.ConnectionFactoryControl; +import org.apache.activemq6.core.management.impl.MBeanInfoHelper; +import org.apache.activemq6.jms.client.HornetQConnectionFactory; +import org.apache.activemq6.jms.server.JMSServerManager; +import org.apache.activemq6.jms.server.config.ConnectionFactoryConfiguration; + +/** + * @author <a href="mailto:[email protected]">Jeff Mesnil</a> + */ +public class JMSConnectionFactoryControlImpl extends StandardMBean implements ConnectionFactoryControl +{ + // Constants ----------------------------------------------------- + + // Attributes ---------------------------------------------------- + + private final ConnectionFactoryConfiguration cfConfig; + + private HornetQConnectionFactory cf; + + private final String name; + + private final JMSServerManager jmsManager; + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + public JMSConnectionFactoryControlImpl(final ConnectionFactoryConfiguration cfConfig, + final HornetQConnectionFactory cf, + final JMSServerManager jmsManager, + final String name) throws NotCompliantMBeanException + { + super(ConnectionFactoryControl.class); + this.cfConfig = cfConfig; + this.cf = cf; + this.name = name; + this.jmsManager = jmsManager; + } + + // Public -------------------------------------------------------- + + // ManagedConnectionFactoryMBean implementation ------------------ + + public String[] getJNDIBindings() + { + return jmsManager.getJNDIOnConnectionFactory(name); + } + + public boolean isCompressLargeMessages() + { + return cf.isCompressLargeMessage(); + } + + public void setCompressLargeMessages(final boolean compress) + { + cfConfig.setCompressLargeMessages(compress); + recreateCF(); + } + + public boolean isHA() + { + return cfConfig.isHA(); + } + + public int getFactoryType() + { + return cfConfig.getFactoryType().intValue(); + } + + public String getClientID() + { + return cfConfig.getClientID(); + } + + public long getClientFailureCheckPeriod() + { + return cfConfig.getClientFailureCheckPeriod(); + } + + public void setClientID(String clientID) + { + cfConfig.setClientID(clientID); + recreateCF(); + } + + public void setDupsOKBatchSize(int dupsOKBatchSize) + { + cfConfig.setDupsOKBatchSize(dupsOKBatchSize); + recreateCF(); + } + + public void setTransactionBatchSize(int transactionBatchSize) + { + cfConfig.setTransactionBatchSize(transactionBatchSize); + recreateCF(); + } + + public void setClientFailureCheckPeriod(long clientFailureCheckPeriod) + { + cfConfig.setClientFailureCheckPeriod(clientFailureCheckPeriod); + recreateCF(); + } + + public void setConnectionTTL(long connectionTTL) + { + cfConfig.setConnectionTTL(connectionTTL); + recreateCF(); + } + + public void setCallTimeout(long callTimeout) + { + cfConfig.setCallTimeout(callTimeout); + recreateCF(); + } + + public void setCallFailoverTimeout(long callTimeout) + { + cfConfig.setCallFailoverTimeout(callTimeout); + recreateCF(); + } + + public void setConsumerWindowSize(int consumerWindowSize) + { + cfConfig.setConsumerWindowSize(consumerWindowSize); + recreateCF(); + } + + public void setConsumerMaxRate(int consumerMaxRate) + { + cfConfig.setConsumerMaxRate(consumerMaxRate); + recreateCF(); + } + + public void setConfirmationWindowSize(int confirmationWindowSize) + { + cfConfig.setConfirmationWindowSize(confirmationWindowSize); + recreateCF(); + } + + public void setProducerMaxRate(int producerMaxRate) + { + cfConfig.setProducerMaxRate(producerMaxRate); + recreateCF(); + } + + public int getProducerWindowSize() + { + return cfConfig.getProducerWindowSize(); + } + + public void setProducerWindowSize(int producerWindowSize) + { + cfConfig.setProducerWindowSize(producerWindowSize); + recreateCF(); + } + + public void setCacheLargeMessagesClient(boolean cacheLargeMessagesClient) + { + cfConfig.setCacheLargeMessagesClient(cacheLargeMessagesClient); + recreateCF(); + } + + public boolean isCacheLargeMessagesClient() + { + return cfConfig.isCacheLargeMessagesClient(); + } + + public void setMinLargeMessageSize(int minLargeMessageSize) + { + cfConfig.setMinLargeMessageSize(minLargeMessageSize); + recreateCF(); + } + + public void setBlockOnNonDurableSend(boolean blockOnNonDurableSend) + { + cfConfig.setBlockOnNonDurableSend(blockOnNonDurableSend); + recreateCF(); + } + + public void setBlockOnAcknowledge(boolean blockOnAcknowledge) + { + cfConfig.setBlockOnAcknowledge(blockOnAcknowledge); + recreateCF(); + } + + public void setBlockOnDurableSend(boolean blockOnDurableSend) + { + cfConfig.setBlockOnDurableSend(blockOnDurableSend); + recreateCF(); + } + + public void setAutoGroup(boolean autoGroup) + { + cfConfig.setAutoGroup(autoGroup); + recreateCF(); + } + + public void setPreAcknowledge(boolean preAcknowledge) + { + cfConfig.setPreAcknowledge(preAcknowledge); + recreateCF(); + } + + public void setMaxRetryInterval(long retryInterval) + { + cfConfig.setMaxRetryInterval(retryInterval); + recreateCF(); + } + + public void setRetryIntervalMultiplier(double retryIntervalMultiplier) + { + cfConfig.setRetryIntervalMultiplier(retryIntervalMultiplier); + recreateCF(); + } + + public void setReconnectAttempts(int reconnectAttempts) + { + cfConfig.setReconnectAttempts(reconnectAttempts); + recreateCF(); + } + + public void setFailoverOnInitialConnection(boolean failover) + { + cfConfig.setFailoverOnInitialConnection(failover); + recreateCF(); + } + + public boolean isUseGlobalPools() + { + return cfConfig.isUseGlobalPools(); + } + + public void setScheduledThreadPoolMaxSize(int scheduledThreadPoolMaxSize) + { + cfConfig.setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize); + recreateCF(); + } + + public int getThreadPoolMaxSize() + { + return cfConfig.getThreadPoolMaxSize(); + } + + public void setThreadPoolMaxSize(int threadPoolMaxSize) + { + cfConfig.setThreadPoolMaxSize(threadPoolMaxSize); + recreateCF(); + } + + public int getInitialMessagePacketSize() + { + return cf.getInitialMessagePacketSize(); + } + + public void setGroupID(String groupID) + { + cfConfig.setGroupID(groupID); + recreateCF(); + } + + public String getGroupID() + { + return cfConfig.getGroupID(); + } + + public void setUseGlobalPools(boolean useGlobalPools) + { + cfConfig.setUseGlobalPools(useGlobalPools); + recreateCF(); + } + + public int getScheduledThreadPoolMaxSize() + { + return cfConfig.getScheduledThreadPoolMaxSize(); + } + + public void setRetryInterval(long retryInterval) + { + cfConfig.setRetryInterval(retryInterval); + recreateCF(); + } + + public long getMaxRetryInterval() + { + return cfConfig.getMaxRetryInterval(); + } + + public String getConnectionLoadBalancingPolicyClassName() + { + return cfConfig.getLoadBalancingPolicyClassName(); + } + + public void setConnectionLoadBalancingPolicyClassName(String name) + { + cfConfig.setLoadBalancingPolicyClassName(name); + recreateCF(); + } + + public TransportConfiguration[] getStaticConnectors() + { + return cf.getStaticConnectors(); + } + + public DiscoveryGroupConfiguration getDiscoveryGroupConfiguration() + { + return cf.getDiscoveryGroupConfiguration(); + } + + public void addJNDI(@Parameter(name = "jndiBinding", desc = "the name of the binding for JNDI") String jndi) throws Exception + { + jmsManager.addConnectionFactoryToJNDI(name, jndi); + } + + public void removeJNDI(@Parameter(name = "jndiBinding", desc = "the name of the binding for JNDI") String jndi) throws Exception + { + jmsManager.removeConnectionFactoryFromJNDI(name, jndi); + } + + public long getCallTimeout() + { + return cfConfig.getCallTimeout(); + } + + public long getCallFailoverTimeout() + { + return cfConfig.getCallFailoverTimeout(); + } + + public int getConsumerMaxRate() + { + return cfConfig.getConsumerMaxRate(); + } + + public int getConsumerWindowSize() + { + return cfConfig.getConsumerWindowSize(); + } + + public int getProducerMaxRate() + { + return cfConfig.getProducerMaxRate(); + } + + public int getConfirmationWindowSize() + { + return cfConfig.getConfirmationWindowSize(); + } + + public int getDupsOKBatchSize() + { + return cfConfig.getDupsOKBatchSize(); + } + + public boolean isBlockOnAcknowledge() + { + return cfConfig.isBlockOnAcknowledge(); + } + + public boolean isBlockOnNonDurableSend() + { + return cfConfig.isBlockOnNonDurableSend(); + } + + public boolean isBlockOnDurableSend() + { + return cfConfig.isBlockOnDurableSend(); + } + + public boolean isPreAcknowledge() + { + return cfConfig.isPreAcknowledge(); + } + + public String getName() + { + return name; + } + + public long getConnectionTTL() + { + return cfConfig.getConnectionTTL(); + } + + public int getReconnectAttempts() + { + return cfConfig.getReconnectAttempts(); + } + + public boolean isFailoverOnInitialConnection() + { + return cfConfig.isFailoverOnInitialConnection(); + } + + public int getMinLargeMessageSize() + { + return cfConfig.getMinLargeMessageSize(); + } + + public long getRetryInterval() + { + return cfConfig.getRetryInterval(); + } + + public double getRetryIntervalMultiplier() + { + return cfConfig.getRetryIntervalMultiplier(); + } + + public int getTransactionBatchSize() + { + return cfConfig.getTransactionBatchSize(); + } + + public boolean isAutoGroup() + { + return cfConfig.isAutoGroup(); + } + + @Override + public MBeanInfo getMBeanInfo() + { + MBeanInfo info = super.getMBeanInfo(); + return new MBeanInfo(info.getClassName(), + info.getDescription(), + info.getAttributes(), + info.getConstructors(), + MBeanInfoHelper.getMBeanOperationsInfo(ConnectionFactoryControl.class), + info.getNotifications()); + } + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + private void recreateCF() + { + try + { + this.cf = jmsManager.recreateCF(this.name, this.cfConfig); + } + catch (Exception e) + { + throw new RuntimeException(e.getMessage(), e); + } + } + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSQueueControlImpl.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSQueueControlImpl.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSQueueControlImpl.java new file mode 100644 index 0000000..f15eaff --- /dev/null +++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSQueueControlImpl.java @@ -0,0 +1,389 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.management.impl; + +import javax.management.MBeanInfo; +import javax.management.StandardMBean; +import java.util.Map; + +import org.apache.activemq6.api.core.FilterConstants; +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.management.MessageCounterInfo; +import org.apache.activemq6.api.core.management.Operation; +import org.apache.activemq6.api.core.management.QueueControl; +import org.apache.activemq6.api.jms.management.JMSQueueControl; +import org.apache.activemq6.core.management.impl.MBeanInfoHelper; +import org.apache.activemq6.core.messagecounter.MessageCounter; +import org.apache.activemq6.core.messagecounter.impl.MessageCounterHelper; +import org.apache.activemq6.jms.client.HornetQDestination; +import org.apache.activemq6.jms.client.HornetQMessage; +import org.apache.activemq6.jms.client.SelectorTranslator; +import org.apache.activemq6.jms.server.JMSServerManager; +import org.apache.activemq6.utils.json.JSONArray; +import org.apache.activemq6.utils.json.JSONObject; + +/** + * @author <a href="mailto:[email protected]">Jeff Mesnil</a> + */ +public class JMSQueueControlImpl extends StandardMBean implements JMSQueueControl +{ + private final HornetQDestination managedQueue; + + private final JMSServerManager jmsServerManager; + + private final QueueControl coreQueueControl; + + private final MessageCounter counter; + + // Static -------------------------------------------------------- + + /** + * Returns null if the string is null or empty + */ + public static String createFilterFromJMSSelector(final String selectorStr) throws HornetQException + { + return selectorStr == null || selectorStr.trim().length() == 0 ? null + : SelectorTranslator.convertToHornetQFilterString(selectorStr); + } + + private static String createFilterForJMSMessageID(final String jmsMessageID) throws Exception + { + return FilterConstants.HORNETQ_USERID + " = '" + jmsMessageID + "'"; + } + + static String toJSON(final Map<String, Object>[] messages) + { + JSONArray array = new JSONArray(); + for (Map<String, Object> message : messages) + { + array.put(new JSONObject(message)); + } + return array.toString(); + } + + // Constructors -------------------------------------------------- + + public JMSQueueControlImpl(final HornetQDestination managedQueue, + final QueueControl coreQueueControl, + final JMSServerManager jmsServerManager, + final MessageCounter counter) throws Exception + { + super(JMSQueueControl.class); + this.managedQueue = managedQueue; + this.jmsServerManager = jmsServerManager; + this.coreQueueControl = coreQueueControl; + this.counter = counter; + } + + // Public -------------------------------------------------------- + + // ManagedJMSQueueMBean implementation --------------------------- + + public String getName() + { + return managedQueue.getName(); + } + + public String getAddress() + { + return managedQueue.getAddress(); + } + + public boolean isTemporary() + { + return managedQueue.isTemporary(); + } + + public long getMessageCount() + { + return coreQueueControl.getMessageCount(); + } + + public long getMessagesAdded() + { + return coreQueueControl.getMessagesAdded(); + } + + public int getConsumerCount() + { + return coreQueueControl.getConsumerCount(); + } + + public int getDeliveringCount() + { + return coreQueueControl.getDeliveringCount(); + } + + public long getScheduledCount() + { + return coreQueueControl.getScheduledCount(); + } + + public boolean isDurable() + { + return coreQueueControl.isDurable(); + } + + public String getDeadLetterAddress() + { + return coreQueueControl.getDeadLetterAddress(); + } + + public void setDeadLetterAddress(final String deadLetterAddress) throws Exception + { + coreQueueControl.setDeadLetterAddress(deadLetterAddress); + } + + public String getExpiryAddress() + { + return coreQueueControl.getExpiryAddress(); + } + + public void setExpiryAddress(final String expiryAddress) throws Exception + { + coreQueueControl.setExpiryAddress(expiryAddress); + } + + @Override + public void addJNDI(String jndi) throws Exception + { + jmsServerManager.addQueueToJndi(managedQueue.getName(), jndi); + } + + public void removeJNDI(String jndi) throws Exception + { + jmsServerManager.removeQueueFromJNDI(managedQueue.getName(), jndi); + } + + public String[] getJNDIBindings() + { + return jmsServerManager.getJNDIOnQueue(managedQueue.getName()); + } + + public boolean removeMessage(final String messageID) throws Exception + { + String filter = JMSQueueControlImpl.createFilterForJMSMessageID(messageID); + int removed = coreQueueControl.removeMessages(filter); + if (removed != 1) + { + throw new IllegalArgumentException("No message found for JMSMessageID: " + messageID); + } + return true; + } + + public int removeMessages(final String filterStr) throws Exception + { + String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr); + return coreQueueControl.removeMessages(filter); + } + + public Map<String, Object>[] listMessages(final String filterStr) throws Exception + { + try + { + String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr); + Map<String, Object>[] coreMessages = coreQueueControl.listMessages(filter); + + Map<String, Object>[] jmsMessages = new Map[coreMessages.length]; + + int i = 0; + + for (Map<String, Object> coreMessage : coreMessages) + { + Map<String, Object> jmsMessage = HornetQMessage.coreMaptoJMSMap(coreMessage); + jmsMessages[i++] = jmsMessage; + } + return jmsMessages; + } + catch (HornetQException e) + { + throw new IllegalStateException(e.getMessage()); + } + } + + public String listMessagesAsJSON(final String filter) throws Exception + { + return JMSQueueControlImpl.toJSON(listMessages(filter)); + } + + public long countMessages(final String filterStr) throws Exception + { + String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr); + return coreQueueControl.countMessages(filter); + } + + public boolean expireMessage(final String messageID) throws Exception + { + String filter = JMSQueueControlImpl.createFilterForJMSMessageID(messageID); + int expired = coreQueueControl.expireMessages(filter); + if (expired != 1) + { + throw new IllegalArgumentException("No message found for JMSMessageID: " + messageID); + } + return true; + } + + public int expireMessages(final String filterStr) throws Exception + { + String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr); + return coreQueueControl.expireMessages(filter); + } + + public boolean sendMessageToDeadLetterAddress(final String messageID) throws Exception + { + String filter = JMSQueueControlImpl.createFilterForJMSMessageID(messageID); + int dead = coreQueueControl.sendMessagesToDeadLetterAddress(filter); + if (dead != 1) + { + throw new IllegalArgumentException("No message found for JMSMessageID: " + messageID); + } + return true; + } + + public int sendMessagesToDeadLetterAddress(final String filterStr) throws Exception + { + String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr); + return coreQueueControl.sendMessagesToDeadLetterAddress(filter); + } + + public boolean changeMessagePriority(final String messageID, final int newPriority) throws Exception + { + String filter = JMSQueueControlImpl.createFilterForJMSMessageID(messageID); + int changed = coreQueueControl.changeMessagesPriority(filter, newPriority); + if (changed != 1) + { + throw new IllegalArgumentException("No message found for JMSMessageID: " + messageID); + } + return true; + } + + public int changeMessagesPriority(final String filterStr, final int newPriority) throws Exception + { + String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr); + return coreQueueControl.changeMessagesPriority(filter, newPriority); + } + + public boolean moveMessage(final String messageID, final String otherQueueName) throws Exception + { + return moveMessage(messageID, otherQueueName, false); + } + + public boolean moveMessage(final String messageID, final String otherQueueName, final boolean rejectDuplicates) throws Exception + { + String filter = JMSQueueControlImpl.createFilterForJMSMessageID(messageID); + HornetQDestination otherQueue = HornetQDestination.createQueue(otherQueueName); + int moved = coreQueueControl.moveMessages(filter, otherQueue.getAddress(), rejectDuplicates); + if (moved != 1) + { + throw new IllegalArgumentException("No message found for JMSMessageID: " + messageID); + } + + return true; + } + + public int moveMessages(final String filterStr, final String otherQueueName, final boolean rejectDuplicates) throws Exception + { + String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr); + HornetQDestination otherQueue = HornetQDestination.createQueue(otherQueueName); + return coreQueueControl.moveMessages(filter, otherQueue.getAddress(), rejectDuplicates); + } + + + public int moveMessages(final String filterStr, final String otherQueueName) throws Exception + { + return moveMessages(filterStr, otherQueueName, false); + } + + @Operation(desc = "List all the existent consumers on the Queue") + public String listConsumersAsJSON() throws Exception + { + return coreQueueControl.listConsumersAsJSON(); + } + + public String listMessageCounter() + { + try + { + return MessageCounterInfo.toJSon(counter); + } + catch (Exception e) + { + throw new IllegalStateException(e); + } + } + + public void resetMessageCounter() throws Exception + { + coreQueueControl.resetMessageCounter(); + } + + public String listMessageCounterAsHTML() + { + return MessageCounterHelper.listMessageCounterAsHTML(new MessageCounter[]{counter}); + } + + public String listMessageCounterHistory() throws Exception + { + return MessageCounterHelper.listMessageCounterHistory(counter); + } + + public String listMessageCounterHistoryAsHTML() + { + return MessageCounterHelper.listMessageCounterHistoryAsHTML(new MessageCounter[]{counter}); + } + + public boolean isPaused() throws Exception + { + return coreQueueControl.isPaused(); + } + + public void pause() throws Exception + { + coreQueueControl.pause(); + } + + public void resume() throws Exception + { + coreQueueControl.resume(); + } + + public String getSelector() + { + return coreQueueControl.getFilter(); + } + + public void flushExecutor() + { + coreQueueControl.flushExecutor(); + } + + @Override + public MBeanInfo getMBeanInfo() + { + MBeanInfo info = super.getMBeanInfo(); + return new MBeanInfo(info.getClassName(), + info.getDescription(), + info.getAttributes(), + info.getConstructors(), + MBeanInfoHelper.getMBeanOperationsInfo(JMSQueueControl.class), + info.getNotifications()); + } + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSServerControlImpl.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSServerControlImpl.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSServerControlImpl.java new file mode 100644 index 0000000..aa8da34 --- /dev/null +++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSServerControlImpl.java @@ -0,0 +1,1049 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.management.impl; + +import javax.jms.JMSRuntimeException; +import javax.management.ListenerNotFoundException; +import javax.management.MBeanNotificationInfo; +import javax.management.MBeanOperationInfo; +import javax.management.Notification; +import javax.management.NotificationBroadcasterSupport; +import javax.management.NotificationEmitter; +import javax.management.NotificationFilter; +import javax.management.NotificationListener; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.activemq6.api.core.management.Parameter; +import org.apache.activemq6.api.jms.JMSFactoryType; +import org.apache.activemq6.api.jms.management.ConnectionFactoryControl; +import org.apache.activemq6.api.jms.management.DestinationControl; +import org.apache.activemq6.api.jms.management.JMSQueueControl; +import org.apache.activemq6.api.jms.management.JMSServerControl; +import org.apache.activemq6.api.jms.management.TopicControl; +import org.apache.activemq6.core.filter.Filter; +import org.apache.activemq6.core.management.impl.AbstractControl; +import org.apache.activemq6.core.management.impl.MBeanInfoHelper; +import org.apache.activemq6.core.server.ServerConsumer; +import org.apache.activemq6.core.server.ServerSession; +import org.apache.activemq6.jms.client.HornetQDestination; +import org.apache.activemq6.jms.server.HornetQJMSServerLogger; +import org.apache.activemq6.jms.server.JMSServerManager; +import org.apache.activemq6.jms.server.config.ConnectionFactoryConfiguration; +import org.apache.activemq6.jms.server.config.impl.ConnectionFactoryConfigurationImpl; +import org.apache.activemq6.jms.server.management.JMSNotificationType; +import org.apache.activemq6.spi.core.protocol.RemotingConnection; +import org.apache.activemq6.utils.TypedProperties; +import org.apache.activemq6.utils.json.JSONArray; +import org.apache.activemq6.utils.json.JSONObject; + +/** + * @author <a href="mailto:[email protected]">Jeff Mesnil</a> + * @author <a href="mailto:[email protected]">Tim Fox</a> + */ +public class JMSServerControlImpl extends AbstractControl implements JMSServerControl, NotificationEmitter, + org.apache.activemq6.core.server.management.NotificationListener +{ + + // Constants ----------------------------------------------------- + + // Attributes ---------------------------------------------------- + + private final JMSServerManager server; + + private final NotificationBroadcasterSupport broadcaster; + + private final AtomicLong notifSeq = new AtomicLong(0); + + // Static -------------------------------------------------------- + + private static String[] convert(final Object[] jndiBindings) + { + String[] bindings = new String[jndiBindings.length]; + for (int i = 0, jndiBindingsLength = jndiBindings.length; i < jndiBindingsLength; i++) + { + bindings[i] = jndiBindings[i].toString().trim(); + } + return bindings; + } + + private static String[] toArray(final String commaSeparatedString) + { + if (commaSeparatedString == null || commaSeparatedString.trim().length() == 0) + { + return new String[0]; + } + String[] values = commaSeparatedString.split(","); + String[] trimmed = new String[values.length]; + for (int i = 0; i < values.length; i++) + { + trimmed[i] = values[i].trim(); + trimmed[i] = trimmed[i].replace(",", ","); + } + return trimmed; + } + + private static String[] determineJMSDestination(String coreAddress) + { + String[] result = new String[2]; // destination name & type + if (coreAddress.startsWith(HornetQDestination.JMS_QUEUE_ADDRESS_PREFIX)) + { + result[0] = coreAddress.substring(HornetQDestination.JMS_QUEUE_ADDRESS_PREFIX.length()); + result[1] = "queue"; + } + else if (coreAddress.startsWith(HornetQDestination.JMS_TEMP_QUEUE_ADDRESS_PREFIX)) + { + result[0] = coreAddress.substring(HornetQDestination.JMS_TEMP_QUEUE_ADDRESS_PREFIX.length()); + result[1] = "tempqueue"; + } + else if (coreAddress.startsWith(HornetQDestination.JMS_TOPIC_ADDRESS_PREFIX)) + { + result[0] = coreAddress.substring(HornetQDestination.JMS_TOPIC_ADDRESS_PREFIX.length()); + result[1] = "topic"; + } + else if (coreAddress.startsWith(HornetQDestination.JMS_TEMP_TOPIC_ADDRESS_PREFIX)) + { + result[0] = coreAddress.substring(HornetQDestination.JMS_TEMP_TOPIC_ADDRESS_PREFIX.length()); + result[1] = "temptopic"; + } + else + { + HornetQJMSServerLogger.LOGGER.debug("JMSServerControlImpl.determineJMSDestination()" + coreAddress); + // not related to JMS + return null; + } + return result; + } + + public static MBeanNotificationInfo[] getNotificationInfos() + { + JMSNotificationType[] values = JMSNotificationType.values(); + String[] names = new String[values.length]; + for (int i = 0; i < values.length; i++) + { + names[i] = values[i].toString(); + } + return new MBeanNotificationInfo[]{new MBeanNotificationInfo(names, + JMSServerControl.class.getName(), + "Notifications emitted by a JMS Server")}; + } + + // Constructors -------------------------------------------------- + + public JMSServerControlImpl(final JMSServerManager server) throws Exception + { + super(JMSServerControl.class, server.getHornetQServer().getStorageManager()); + this.server = server; + broadcaster = new NotificationBroadcasterSupport(); + server.getHornetQServer().getManagementService().addNotificationListener(this); + } + + // Public -------------------------------------------------------- + + // JMSServerControlMBean implementation -------------------------- + + /** + * See the interface definition for the javadoc. + */ + public void createConnectionFactory(String name, + boolean ha, + boolean useDiscovery, + int cfType, + String[] connectorNames, + Object[] bindings) throws Exception + { + checkStarted(); + + clearIO(); + + try + { + if (useDiscovery) + { + if (connectorNames == null || connectorNames.length == 0) + { + throw new IllegalArgumentException("no discovery group name supplied"); + } + server.createConnectionFactory(name, + ha, + JMSFactoryType.valueOf(cfType), + connectorNames[0], + JMSServerControlImpl.convert(bindings)); + } + else + { + List<String> connectorList = new ArrayList<String>(connectorNames.length); + + for (String str : connectorNames) + { + connectorList.add(str); + } + + server.createConnectionFactory(name, + ha, + JMSFactoryType.valueOf(cfType), + connectorList, + JMSServerControlImpl.convert(bindings)); + } + } + finally + { + blockOnIO(); + } + } + + @Override + public void createConnectionFactory(String name, + boolean ha, + boolean useDiscovery, + int cfType, + String connectors, + String jndiBindings, + String clientID, + long clientFailureCheckPeriod, + long connectionTTL, + long callTimeout, + long callFailoverTimeout, + int minLargeMessageSize, + boolean compressLargeMessages, + int consumerWindowSize, + int consumerMaxRate, + int confirmationWindowSize, + int producerWindowSize, + int producerMaxRate, + boolean blockOnAcknowledge, + boolean blockOnDurableSend, + boolean blockOnNonDurableSend, + boolean autoGroup, + boolean preAcknowledge, + String loadBalancingPolicyClassName, + int transactionBatchSize, + int dupsOKBatchSize, + boolean useGlobalPools, + int scheduledThreadPoolMaxSize, + int threadPoolMaxSize, + long retryInterval, + double retryIntervalMultiplier, + long maxRetryInterval, + int reconnectAttempts, + boolean failoverOnInitialConnection, + String groupId) throws Exception + { + createConnectionFactory(name, + ha, + useDiscovery, + cfType, + toArray(connectors), + toArray(jndiBindings), + clientID, + clientFailureCheckPeriod, + connectionTTL, + callTimeout, + callFailoverTimeout, + minLargeMessageSize, + compressLargeMessages, + consumerWindowSize, + consumerMaxRate, + confirmationWindowSize, + producerWindowSize, + producerMaxRate, + blockOnAcknowledge, + blockOnDurableSend, + blockOnNonDurableSend, + autoGroup, + preAcknowledge, + loadBalancingPolicyClassName, + transactionBatchSize, + dupsOKBatchSize, + useGlobalPools, + scheduledThreadPoolMaxSize, + threadPoolMaxSize, + retryInterval, + retryIntervalMultiplier, + maxRetryInterval, + reconnectAttempts, + failoverOnInitialConnection, + groupId); + } + + @Override + public void createConnectionFactory(String name, + boolean ha, + boolean useDiscovery, + int cfType, + String[] connectorNames, + String[] bindings, + String clientID, + long clientFailureCheckPeriod, + long connectionTTL, + long callTimeout, + long callFailoverTimeout, + int minLargeMessageSize, + boolean compressLargeMessages, + int consumerWindowSize, + int consumerMaxRate, + int confirmationWindowSize, + int producerWindowSize, + int producerMaxRate, + boolean blockOnAcknowledge, + boolean blockOnDurableSend, + boolean blockOnNonDurableSend, + boolean autoGroup, + boolean preAcknowledge, + String loadBalancingPolicyClassName, + int transactionBatchSize, + int dupsOKBatchSize, + boolean useGlobalPools, + int scheduledThreadPoolMaxSize, + int threadPoolMaxSize, + long retryInterval, + double retryIntervalMultiplier, + long maxRetryInterval, + int reconnectAttempts, + boolean failoverOnInitialConnection, + String groupId) throws Exception + { + checkStarted(); + + clearIO(); + + try + { + ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl() + .setName(name) + .setHA(ha) + .setBindings(bindings) + .setFactoryType(JMSFactoryType.valueOf(cfType)) + .setClientID(clientID) + .setClientFailureCheckPeriod(clientFailureCheckPeriod) + .setConnectionTTL(connectionTTL) + .setCallTimeout(callTimeout) + .setCallFailoverTimeout(callFailoverTimeout) + .setMinLargeMessageSize(minLargeMessageSize) + .setCompressLargeMessages(compressLargeMessages) + .setConsumerWindowSize(consumerWindowSize) + .setConsumerMaxRate(consumerMaxRate) + .setConfirmationWindowSize(confirmationWindowSize) + .setProducerWindowSize(producerWindowSize) + .setProducerMaxRate(producerMaxRate) + .setBlockOnAcknowledge(blockOnAcknowledge) + .setBlockOnDurableSend(blockOnDurableSend) + .setBlockOnNonDurableSend(blockOnNonDurableSend) + .setAutoGroup(autoGroup) + .setPreAcknowledge(preAcknowledge) + .setTransactionBatchSize(transactionBatchSize) + .setDupsOKBatchSize(dupsOKBatchSize) + .setUseGlobalPools(useGlobalPools) + .setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize) + .setThreadPoolMaxSize(threadPoolMaxSize) + .setRetryInterval(retryInterval) + .setRetryIntervalMultiplier(retryIntervalMultiplier) + .setMaxRetryInterval(maxRetryInterval) + .setReconnectAttempts(reconnectAttempts) + .setFailoverOnInitialConnection(failoverOnInitialConnection) + .setGroupID(groupId); + + if (useDiscovery) + { + configuration.setDiscoveryGroupName(connectorNames[0]); + } + else + { + ArrayList<String> connectorNamesList = new ArrayList<String>(); + for (String nameC : connectorNames) + { + connectorNamesList.add(nameC); + } + configuration.setConnectorNames(connectorNamesList); + } + + if (loadBalancingPolicyClassName != null && !loadBalancingPolicyClassName.trim().equals("")) + { + configuration.setLoadBalancingPolicyClassName(loadBalancingPolicyClassName); + } + + server.createConnectionFactory(true, configuration, bindings); + } + finally + { + blockOnIO(); + } + } + + /** + * Create a JMS ConnectionFactory with the specified name connected to a single live-backup pair of servers. + * <br> + * The ConnectionFactory is bound to JNDI for all the specified bindings Strings. + */ + public void createConnectionFactory(String name, + boolean ha, + boolean useDiscovery, + int cfType, + String connectors, + String jndiBindings) throws Exception + { + createConnectionFactory(name, ha, useDiscovery, cfType, toArray(connectors), toArray(jndiBindings)); + } + + public boolean createQueue(final String name) throws Exception + { + return createQueue(name, null, null, true); + } + + public boolean createQueue(final String name, final String jndiBindings) throws Exception + { + return createQueue(name, jndiBindings, null, true); + } + + @Override + public boolean createQueue(String name, String jndiBindings, String selector) throws Exception + { + return createQueue(name, jndiBindings, selector, true); + } + + public boolean createQueue(@Parameter(name = "name", desc = "Name of the queue to create") String name, + @Parameter(name = "jndiBindings", desc = "comma-separated list of JNDI bindings (use ',' if u need to use commas in your jndi name)") String jndiBindings, + @Parameter(name = "selector", desc = "the jms selector") String selector, + @Parameter(name = "durable", desc = "is the queue persistent and resilient to restart") boolean durable) throws Exception + { + checkStarted(); + + clearIO(); + + try + { + return server.createQueue(true, name, selector, durable, + JMSServerControlImpl.toArray(jndiBindings)); + } + finally + { + blockOnIO(); + } + } + + public boolean destroyQueue(final String name) throws Exception + { + return destroyQueue(name, false); + } + + public boolean destroyQueue(final String name, final boolean removeConsumers) throws Exception + { + checkStarted(); + + clearIO(); + + try + { + return server.destroyQueue(name, removeConsumers); + } + finally + { + blockOnIO(); + } + } + + public boolean createTopic(String name) throws Exception + { + return createTopic(name, null); + } + + public boolean createTopic(final String topicName, final String jndiBindings) throws Exception + { + checkStarted(); + + clearIO(); + + try + { + return server.createTopic(true, topicName, JMSServerControlImpl.toArray(jndiBindings)); + } + finally + { + blockOnIO(); + } + } + + public boolean destroyTopic(final String name) throws Exception + { + return destroyTopic(name, true); + } + + + public boolean destroyTopic(final String name, final boolean removeConsumers) throws Exception + { + checkStarted(); + + clearIO(); + + try + { + return server.destroyTopic(name, removeConsumers); + } + finally + { + blockOnIO(); + } + } + + public void destroyConnectionFactory(final String name) throws Exception + { + checkStarted(); + + clearIO(); + + try + { + server.destroyConnectionFactory(name); + } + finally + { + blockOnIO(); + } + } + + public boolean isStarted() + { + return server.isStarted(); + } + + public String getVersion() + { + checkStarted(); + + return server.getVersion(); + } + + public String[] getQueueNames() + { + checkStarted(); + + clearIO(); + + try + { + Object[] queueControls = server.getHornetQServer().getManagementService().getResources(JMSQueueControl.class); + String[] names = new String[queueControls.length]; + for (int i = 0; i < queueControls.length; i++) + { + JMSQueueControl queueControl = (JMSQueueControl) queueControls[i]; + names[i] = queueControl.getName(); + } + return names; + } + finally + { + blockOnIO(); + } + } + + public String[] getTopicNames() + { + checkStarted(); + + clearIO(); + + try + { + Object[] topicControls = server.getHornetQServer().getManagementService().getResources(TopicControl.class); + String[] names = new String[topicControls.length]; + for (int i = 0; i < topicControls.length; i++) + { + TopicControl topicControl = (TopicControl) topicControls[i]; + names[i] = topicControl.getName(); + } + return names; + } + finally + { + blockOnIO(); + } + } + + public String[] getConnectionFactoryNames() + { + checkStarted(); + + clearIO(); + + try + { + Object[] cfControls = server.getHornetQServer() + .getManagementService() + .getResources(ConnectionFactoryControl.class); + String[] names = new String[cfControls.length]; + for (int i = 0; i < cfControls.length; i++) + { + ConnectionFactoryControl cfControl = (ConnectionFactoryControl) cfControls[i]; + names[i] = cfControl.getName(); + } + return names; + } + finally + { + blockOnIO(); + } + } + + // NotificationEmitter implementation ---------------------------- + + public void removeNotificationListener(final NotificationListener listener, + final NotificationFilter filter, + final Object handback) throws ListenerNotFoundException + { + broadcaster.removeNotificationListener(listener, filter, handback); + } + + public void removeNotificationListener(final NotificationListener listener) throws ListenerNotFoundException + { + broadcaster.removeNotificationListener(listener); + } + + public void addNotificationListener(final NotificationListener listener, + final NotificationFilter filter, + final Object handback) throws IllegalArgumentException + { + broadcaster.addNotificationListener(listener, filter, handback); + } + + public MBeanNotificationInfo[] getNotificationInfo() + { + return JMSServerControlImpl.getNotificationInfos(); + } + + public String[] listRemoteAddresses() throws Exception + { + checkStarted(); + + clearIO(); + + try + { + return server.listRemoteAddresses(); + } + finally + { + blockOnIO(); + } + } + + public String[] listRemoteAddresses(final String ipAddress) throws Exception + { + checkStarted(); + + clearIO(); + + try + { + return server.listRemoteAddresses(ipAddress); + } + finally + { + blockOnIO(); + } + } + + public boolean closeConnectionsForAddress(final String ipAddress) throws Exception + { + checkStarted(); + + clearIO(); + + try + { + return server.closeConnectionsForAddress(ipAddress); + } + finally + { + blockOnIO(); + } + } + + public boolean closeConsumerConnectionsForAddress(final String address) throws Exception + { + checkStarted(); + + clearIO(); + + try + { + return server.closeConsumerConnectionsForAddress(address); + } + finally + { + blockOnIO(); + } + } + + public boolean closeConnectionsForUser(final String userName) throws Exception + { + checkStarted(); + + clearIO(); + + try + { + return server.closeConnectionsForUser(userName); + } + finally + { + blockOnIO(); + } + } + + public String[] listConnectionIDs() throws Exception + { + checkStarted(); + + clearIO(); + + try + { + return server.listConnectionIDs(); + } + finally + { + blockOnIO(); + } + } + + public String listConnectionsAsJSON() throws Exception + { + checkStarted(); + + clearIO(); + + try + { + JSONArray array = new JSONArray(); + + Set<RemotingConnection> connections = server.getHornetQServer().getRemotingService().getConnections(); + + Set<ServerSession> sessions = server.getHornetQServer().getSessions(); + + Map<Object, ServerSession> jmsSessions = new HashMap<Object, ServerSession>(); + + for (ServerSession session : sessions) + { + if (session.getMetaData("jms-session") != null) + { + jmsSessions.put(session.getConnectionID(), session); + } + } + + for (RemotingConnection connection : connections) + { + ServerSession session = jmsSessions.get(connection.getID()); + if (session != null) + { + JSONObject obj = new JSONObject(); + obj.put("connectionID", connection.getID().toString()); + obj.put("clientAddress", connection.getRemoteAddress()); + obj.put("creationTime", connection.getCreationTime()); + obj.put("clientID", session.getMetaData("jms-client-id")); + obj.put("principal", session.getUsername()); + array.put(obj); + } + } + return array.toString(); + } + finally + { + blockOnIO(); + } + } + + public String listConsumersAsJSON(String connectionID) throws Exception + { + checkStarted(); + + clearIO(); + + try + { + JSONArray array = new JSONArray(); + + Set<RemotingConnection> connections = server.getHornetQServer().getRemotingService().getConnections(); + for (RemotingConnection connection : connections) + { + if (connectionID.equals(connection.getID().toString())) + { + List<ServerSession> sessions = server.getHornetQServer().getSessions(connectionID); + for (ServerSession session : sessions) + { + Set<ServerConsumer> consumers = session.getServerConsumers(); + for (ServerConsumer consumer : consumers) + { + JSONObject obj = toJSONObject(consumer); + if (obj != null) + { + array.put(obj); + } + } + } + } + } + return array.toString(); + } + finally + { + blockOnIO(); + } + } + + public String listAllConsumersAsJSON() throws Exception + { + checkStarted(); + + clearIO(); + + try + { + JSONArray array = new JSONArray(); + + Set<ServerSession> sessions = server.getHornetQServer().getSessions(); + for (ServerSession session : sessions) + { + Set<ServerConsumer> consumers = session.getServerConsumers(); + for (ServerConsumer consumer : consumers) + { + JSONObject obj = toJSONObject(consumer); + if (obj != null) + { + array.put(obj); + } + } + } + return array.toString(); + } + finally + { + blockOnIO(); + } + } + + public String[] listSessions(final String connectionID) throws Exception + { + checkStarted(); + + clearIO(); + + try + { + return server.listSessions(connectionID); + } + finally + { + blockOnIO(); + } + } + + public String listPreparedTransactionDetailsAsJSON() throws Exception + { + checkStarted(); + + clearIO(); + + try + { + return server.listPreparedTransactionDetailsAsJSON(); + } + finally + { + blockOnIO(); + } + } + + public String listPreparedTransactionDetailsAsHTML() throws Exception + { + checkStarted(); + + clearIO(); + + try + { + return server.listPreparedTransactionDetailsAsHTML(); + } + finally + { + blockOnIO(); + } + } + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + /* (non-Javadoc) + * @see org.apache.activemq6.core.management.impl.AbstractControl#fillMBeanOperationInfo() + */ + @Override + protected MBeanOperationInfo[] fillMBeanOperationInfo() + { + return MBeanInfoHelper.getMBeanOperationsInfo(JMSServerControl.class); + } + + // Private ------------------------------------------------------- + + private void checkStarted() + { + if (!server.isStarted()) + { + throw new IllegalStateException("HornetQ JMS Server is not started. it can not be managed yet"); + } + } + + // Inner classes ------------------------------------------------- + + public String[] listTargetDestinations(String sessionID) throws Exception + { + String[] addresses = server.getHornetQServer().getHornetQServerControl().listTargetAddresses(sessionID); + Map<String, DestinationControl> allDests = new HashMap<String, DestinationControl>(); + + Object[] queueControls = server.getHornetQServer().getManagementService().getResources(JMSQueueControl.class); + for (Object queueControl2 : queueControls) + { + JMSQueueControl queueControl = (JMSQueueControl) queueControl2; + allDests.put(queueControl.getAddress(), queueControl); + } + + Object[] topicControls = server.getHornetQServer().getManagementService().getResources(TopicControl.class); + for (Object topicControl2 : topicControls) + { + TopicControl topicControl = (TopicControl) topicControl2; + allDests.put(topicControl.getAddress(), topicControl); + } + + List<String> destinations = new ArrayList<String>(); + for (String addresse : addresses) + { + DestinationControl control = allDests.get(addresse); + if (control != null) + { + destinations.add(control.getAddress()); + } + } + return destinations.toArray(new String[0]); + } + + public String getLastSentMessageID(String sessionID, String address) throws Exception + { + ServerSession session = server.getHornetQServer().getSessionByID(sessionID); + if (session != null) + { + return session.getLastSentMessageID(address); + } + return null; + } + + public String getSessionCreationTime(String sessionID) throws Exception + { + ServerSession session = server.getHornetQServer().getSessionByID(sessionID); + if (session != null) + { + return String.valueOf(session.getCreationTime()); + } + return null; + } + + public String listSessionsAsJSON(final String connectionID) throws Exception + { + checkStarted(); + + clearIO(); + + JSONArray array = new JSONArray(); + try + { + List<ServerSession> sessions = server.getHornetQServer().getSessions(connectionID); + for (ServerSession sess : sessions) + { + JSONObject obj = new JSONObject(); + obj.put("sessionID", sess.getName()); + obj.put("creationTime", sess.getCreationTime()); + array.put(obj); + } + } + finally + { + blockOnIO(); + } + return array.toString(); + } + + public String closeConnectionWithClientID(final String clientID) throws Exception + { + return server.getHornetQServer().destroyConnectionWithSessionMetadata("jms-client-id", clientID); + } + + private JSONObject toJSONObject(ServerConsumer consumer) throws Exception + { + JSONObject obj = new JSONObject(); + obj.put("consumerID", consumer.getID()); + obj.put("connectionID", consumer.getConnectionID()); + obj.put("sessionID", consumer.getSessionID()); + obj.put("queueName", consumer.getQueue().getName().toString()); + obj.put("browseOnly", consumer.isBrowseOnly()); + obj.put("creationTime", consumer.getCreationTime()); + // JMS consumer with message filter use the queue's filter + Filter queueFilter = consumer.getQueue().getFilter(); + if (queueFilter != null) + { + obj.put("filter", queueFilter.getFilterString().toString()); + } + String[] destinationInfo = determineJMSDestination(consumer.getQueue().getAddress().toString()); + if (destinationInfo == null) + { + return null; + } + obj.put("destinationName", destinationInfo[0]); + obj.put("destinationType", destinationInfo[1]); + if (destinationInfo[1].equals("topic")) + { + try + { + HornetQDestination.decomposeQueueNameForDurableSubscription(consumer.getQueue().getName().toString()); + obj.put("durable", true); + } + catch (IllegalArgumentException e) + { + obj.put("durable", false); + } + catch (JMSRuntimeException e) + { + obj.put("durable", false); + } + } + else + { + obj.put("durable", false); + } + + return obj; + } + + @Override + public void onNotification(org.apache.activemq6.core.server.management.Notification notification) + { + if (!(notification.getType() instanceof JMSNotificationType)) return; + JMSNotificationType type = (JMSNotificationType) notification.getType(); + TypedProperties prop = notification.getProperties(); + + this.broadcaster.sendNotification(new Notification(type.toString(), this, + notifSeq.incrementAndGet(), prop.getSimpleStringProperty(JMSNotificationType.MESSAGE).toString())); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSTopicControlImpl.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSTopicControlImpl.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSTopicControlImpl.java new file mode 100644 index 0000000..53730cc --- /dev/null +++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSTopicControlImpl.java @@ -0,0 +1,417 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.management.impl; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import javax.management.MBeanInfo; +import javax.management.StandardMBean; + +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.Pair; +import org.apache.activemq6.api.core.management.AddressControl; +import org.apache.activemq6.api.core.management.HornetQServerControl; +import org.apache.activemq6.api.core.management.QueueControl; +import org.apache.activemq6.api.core.management.ResourceNames; +import org.apache.activemq6.api.jms.management.TopicControl; +import org.apache.activemq6.core.management.impl.MBeanInfoHelper; +import org.apache.activemq6.core.server.management.ManagementService; +import org.apache.activemq6.jms.client.HornetQDestination; +import org.apache.activemq6.jms.client.HornetQMessage; +import org.apache.activemq6.jms.client.SelectorTranslator; +import org.apache.activemq6.jms.server.JMSServerManager; +import org.apache.activemq6.utils.json.JSONArray; +import org.apache.activemq6.utils.json.JSONObject; + +/** + * @author <a href="mailto:[email protected]">Jeff Mesnil</a> + * + * + */ +public class JMSTopicControlImpl extends StandardMBean implements TopicControl +{ + private final HornetQDestination managedTopic; + + private final AddressControl addressControl; + + private final ManagementService managementService; + + private final JMSServerManager jmsServerManager; + + // Static -------------------------------------------------------- + + public static String createFilterFromJMSSelector(final String selectorStr) throws HornetQException + { + return selectorStr == null || selectorStr.trim().length() == 0 ? null + : SelectorTranslator.convertToHornetQFilterString(selectorStr); + } + + // Constructors -------------------------------------------------- + + public JMSTopicControlImpl(final HornetQDestination topic, + final JMSServerManager jmsServerManager, + final AddressControl addressControl, + final ManagementService managementService) throws Exception + { + super(TopicControl.class); + this.jmsServerManager = jmsServerManager; + managedTopic = topic; + this.addressControl = addressControl; + this.managementService = managementService; + } + + // TopicControlMBean implementation ------------------------------ + + @Override + public void addJNDI(String jndi) throws Exception + { + jmsServerManager.addTopicToJndi(managedTopic.getName(), jndi); + } + + public String[] getJNDIBindings() + { + return jmsServerManager.getJNDIOnTopic(managedTopic.getName()); + } + + public String getName() + { + return managedTopic.getName(); + } + + public boolean isTemporary() + { + return managedTopic.isTemporary(); + } + + public String getAddress() + { + return managedTopic.getAddress(); + } + + public long getMessageCount() + { + return getMessageCount(DurabilityType.ALL); + } + + public int getDeliveringCount() + { + List<QueueControl> queues = getQueues(DurabilityType.ALL); + int count = 0; + for (QueueControl queue : queues) + { + count += queue.getDeliveringCount(); + } + return count; + } + + public long getMessagesAdded() + { + List<QueueControl> queues = getQueues(DurabilityType.ALL); + int count = 0; + for (QueueControl queue : queues) + { + count += queue.getMessagesAdded(); + } + return count; + } + + public int getDurableMessageCount() + { + return getMessageCount(DurabilityType.DURABLE); + } + + public int getNonDurableMessageCount() + { + return getMessageCount(DurabilityType.NON_DURABLE); + } + + public int getSubscriptionCount() + { + return getQueues(DurabilityType.ALL).size(); + } + + public int getDurableSubscriptionCount() + { + return getQueues(DurabilityType.DURABLE).size(); + } + + public int getNonDurableSubscriptionCount() + { + return getQueues(DurabilityType.NON_DURABLE).size(); + } + + public Object[] listAllSubscriptions() + { + return listSubscribersInfos(DurabilityType.ALL); + } + + public String listAllSubscriptionsAsJSON() throws Exception + { + return listSubscribersInfosAsJSON(DurabilityType.ALL); + } + + public Object[] listDurableSubscriptions() + { + return listSubscribersInfos(DurabilityType.DURABLE); + } + + public String listDurableSubscriptionsAsJSON() throws Exception + { + return listSubscribersInfosAsJSON(DurabilityType.DURABLE); + } + + public Object[] listNonDurableSubscriptions() + { + return listSubscribersInfos(DurabilityType.NON_DURABLE); + } + + public String listNonDurableSubscriptionsAsJSON() throws Exception + { + return listSubscribersInfosAsJSON(DurabilityType.NON_DURABLE); + } + + public Map<String, Object>[] listMessagesForSubscription(final String queueName) throws Exception + { + QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queueName); + if (coreQueueControl == null) + { + throw new IllegalArgumentException("No subscriptions with name " + queueName); + } + + Map<String, Object>[] coreMessages = coreQueueControl.listMessages(null); + + Map<String, Object>[] jmsMessages = new Map[coreMessages.length]; + + int i = 0; + + for (Map<String, Object> coreMessage : coreMessages) + { + jmsMessages[i++] = HornetQMessage.coreMaptoJMSMap(coreMessage); + } + return jmsMessages; + } + + public String listMessagesForSubscriptionAsJSON(final String queueName) throws Exception + { + return JMSQueueControlImpl.toJSON(listMessagesForSubscription(queueName)); + } + + public int countMessagesForSubscription(final String clientID, final String subscriptionName, final String filterStr) throws Exception + { + String queueName = HornetQDestination.createQueueNameForDurableSubscription(true, clientID, subscriptionName); + QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queueName); + if (coreQueueControl == null) + { + throw new IllegalArgumentException("No subscriptions with name " + queueName + " for clientID " + clientID); + } + String filter = JMSTopicControlImpl.createFilterFromJMSSelector(filterStr); + return coreQueueControl.listMessages(filter).length; + } + + public int removeMessages(final String filterStr) throws Exception + { + String filter = JMSTopicControlImpl.createFilterFromJMSSelector(filterStr); + int count = 0; + String[] queues = addressControl.getQueueNames(); + for (String queue : queues) + { + QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queue); + if (coreQueueControl != null) + { + count += coreQueueControl.removeMessages(filter); + } + } + + return count; + } + + public void dropDurableSubscription(final String clientID, final String subscriptionName) throws Exception + { + String queueName = HornetQDestination.createQueueNameForDurableSubscription(true, clientID, subscriptionName); + QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queueName); + if (coreQueueControl == null) + { + throw new IllegalArgumentException("No subscriptions with name " + queueName + " for clientID " + clientID); + } + HornetQServerControl serverControl = (HornetQServerControl)managementService.getResource(ResourceNames.CORE_SERVER); + serverControl.destroyQueue(queueName); + } + + public void dropAllSubscriptions() throws Exception + { + HornetQServerControl serverControl = (HornetQServerControl)managementService.getResource(ResourceNames.CORE_SERVER); + String[] queues = addressControl.getQueueNames(); + for (String queue : queues) + { + // Drop all subscription shouldn't delete the dummy queue used to identify if the topic exists on the core queues. + // we will just ignore this queue + if (!queue.equals(managedTopic.getAddress())) + { + serverControl.destroyQueue(queue); + } + } + } + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + private Object[] listSubscribersInfos(final DurabilityType durability) + { + List<QueueControl> queues = getQueues(durability); + List<Object[]> subInfos = new ArrayList<Object[]>(queues.size()); + + for (QueueControl queue : queues) + { + String clientID = null; + String subName = null; + + if (queue.isDurable()) + { + Pair<String, String> pair = HornetQDestination.decomposeQueueNameForDurableSubscription(queue.getName() + .toString()); + clientID = pair.getA(); + subName = pair.getB(); + } + + String filter = queue.getFilter() != null ? queue.getFilter() : null; + + Object[] subscriptionInfo = new Object[6]; + subscriptionInfo[0] = queue.getName(); + subscriptionInfo[1] = clientID; + subscriptionInfo[2] = subName; + subscriptionInfo[3] = queue.isDurable(); + subscriptionInfo[4] = queue.getMessageCount(); + subscriptionInfo[5] = filter; + subInfos.add(subscriptionInfo); + } + return subInfos.toArray(new Object[subInfos.size()]); + } + + private String listSubscribersInfosAsJSON(final DurabilityType durability) throws Exception + { + try + { + List<QueueControl> queues = getQueues(durability); + JSONArray array = new JSONArray(); + + for (QueueControl queue : queues) + { + String clientID = null; + String subName = null; + + if (queue.isDurable() && !queue.getName().startsWith(ResourceNames.JMS_TOPIC)) + { + Pair<String, String> pair = HornetQDestination.decomposeQueueNameForDurableSubscription(queue.getName() + .toString()); + clientID = pair.getA(); + subName = pair.getB(); + } + else if (queue.getName().startsWith(ResourceNames.JMS_TOPIC)) + { + // in the case of heirarchical topics the queue name will not follow the <part>.<part> pattern of normal + // durable subscribers so skip decomposing the name for the client ID and subscription name and just + // hard-code it + clientID = "HornetQ"; + subName = "HornetQ"; + } + + String filter = queue.getFilter() != null ? queue.getFilter() : null; + + JSONObject info = new JSONObject(); + + info.put("queueName", queue.getName()); + info.put("clientID", clientID); + info.put("selector", filter); + info.put("name", subName); + info.put("durable", queue.isDurable()); + info.put("messageCount", queue.getMessageCount()); + info.put("deliveringCount", queue.getDeliveringCount()); + info.put("consumers", new JSONArray(queue.listConsumersAsJSON()) ); + array.put(info); + } + + return array.toString(); + } + catch (Exception e) + { + e.printStackTrace(); + return e.toString(); + } + } + + private int getMessageCount(final DurabilityType durability) + { + List<QueueControl> queues = getQueues(durability); + int count = 0; + for (QueueControl queue : queues) + { + count += queue.getMessageCount(); + } + return count; + } + + private List<QueueControl> getQueues(final DurabilityType durability) + { + try + { + List<QueueControl> matchingQueues = new ArrayList<QueueControl>(); + String[] queues = addressControl.getQueueNames(); + for (String queue : queues) + { + QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queue); + + // Ignore the "special" subscription + if (coreQueueControl != null && !coreQueueControl.getName().equals(addressControl.getAddress())) + { + if (durability == DurabilityType.ALL || durability == DurabilityType.DURABLE && + coreQueueControl.isDurable() || + durability == DurabilityType.NON_DURABLE && + !coreQueueControl.isDurable()) + { + matchingQueues.add(coreQueueControl); + } + } + } + return matchingQueues; + } + catch (Exception e) + { + return Collections.emptyList(); + } + } + + @Override + public MBeanInfo getMBeanInfo() + { + MBeanInfo info = super.getMBeanInfo(); + return new MBeanInfo(info.getClassName(), + info.getDescription(), + info.getAttributes(), + info.getConstructors(), + MBeanInfoHelper.getMBeanOperationsInfo(TopicControl.class), + info.getNotifications()); + } + + // Inner classes ------------------------------------------------- + + private enum DurabilityType + { + ALL, DURABLE, NON_DURABLE + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/persistence/JMSStorageManager.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/persistence/JMSStorageManager.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/persistence/JMSStorageManager.java new file mode 100644 index 0000000..921c352 --- /dev/null +++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/persistence/JMSStorageManager.java @@ -0,0 +1,54 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.persistence; + +import java.util.List; + +import org.apache.activemq6.core.server.HornetQComponent; +import org.apache.activemq6.jms.persistence.config.PersistedConnectionFactory; +import org.apache.activemq6.jms.persistence.config.PersistedDestination; +import org.apache.activemq6.jms.persistence.config.PersistedJNDI; +import org.apache.activemq6.jms.persistence.config.PersistedType; + +/** + * A JMSPersistence + * + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * + * + */ +public interface JMSStorageManager extends HornetQComponent +{ + + void load() throws Exception; + + void storeDestination(PersistedDestination destination) throws Exception; + + void deleteDestination(PersistedType type, String name) throws Exception; + + List<PersistedDestination> recoverDestinations(); + + void deleteConnectionFactory(String connectionFactory) throws Exception; + + void storeConnectionFactory(PersistedConnectionFactory connectionFactory) throws Exception; + + List<PersistedConnectionFactory> recoverConnectionFactories(); + + void addJNDI(PersistedType type, String name, String ... address) throws Exception; + + List<PersistedJNDI> recoverPersistedJNDI() throws Exception; + + void deleteJNDI(PersistedType type, String name, String address) throws Exception; + + void deleteJNDI(PersistedType type, String name) throws Exception; +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/persistence/config/PersistedConnectionFactory.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/persistence/config/PersistedConnectionFactory.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/persistence/config/PersistedConnectionFactory.java new file mode 100644 index 0000000..33e4bdb --- /dev/null +++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/persistence/config/PersistedConnectionFactory.java @@ -0,0 +1,111 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.persistence.config; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.core.journal.EncodingSupport; +import org.apache.activemq6.jms.server.config.ConnectionFactoryConfiguration; +import org.apache.activemq6.jms.server.config.impl.ConnectionFactoryConfigurationImpl; + +/** + * A PersistedConnectionFactory + * + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * + * + */ +public class PersistedConnectionFactory implements EncodingSupport +{ + + // Constants ----------------------------------------------------- + + // Attributes ---------------------------------------------------- + + private long id; + + private ConnectionFactoryConfiguration config; + + public PersistedConnectionFactory() + { + super(); + } + + /** + * @param config + */ + public PersistedConnectionFactory(final ConnectionFactoryConfiguration config) + { + super(); + this.config = config; + } + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + // Public -------------------------------------------------------- + + /** + * @return the id + */ + public long getId() + { + return id; + } + + public void setId(final long id) + { + this.id = id; + } + + public String getName() + { + return config.getName(); + } + + /** + * @return the config + */ + public ConnectionFactoryConfiguration getConfig() + { + return config; + } + + @Override + public void decode(final HornetQBuffer buffer) + { + config = new ConnectionFactoryConfigurationImpl(); + config.decode(buffer); + } + + @Override + public void encode(final HornetQBuffer buffer) + { + config.encode(buffer); + } + + @Override + public int getEncodeSize() + { + return config.getEncodeSize(); + } + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/persistence/config/PersistedDestination.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/persistence/config/PersistedDestination.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/persistence/config/PersistedDestination.java new file mode 100644 index 0000000..3653773 --- /dev/null +++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/persistence/config/PersistedDestination.java @@ -0,0 +1,128 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.persistence.config; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.SimpleString; +import org.apache.activemq6.core.journal.EncodingSupport; +import org.apache.activemq6.utils.BufferHelper; +import org.apache.activemq6.utils.DataConstants; + +/** + * A PersistedDestination + * + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + */ +public class PersistedDestination implements EncodingSupport +{ + + + // Constants ----------------------------------------------------- + + // Attributes ---------------------------------------------------- + + private long id; + + private PersistedType type; + + private String name; + + private String selector; + + private boolean durable; + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + public PersistedDestination() + { + } + + public PersistedDestination(final PersistedType type, final String name) + { + this(type, name, null, true); + } + + public PersistedDestination(final PersistedType type, final String name, final String selector, final boolean durable) + { + this.type = type; + this.name = name; + this.selector = selector; + this.durable = durable; + } + // Public -------------------------------------------------------- + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- + + + public long getId() + { + return id; + } + + public void setId(final long id) + { + this.id = id; + } + + public String getName() + { + return name; + } + + public PersistedType getType() + { + return type; + } + + public String getSelector() + { + return selector; + } + + public boolean isDurable() + { + return durable; + } + + public int getEncodeSize() + { + return DataConstants.SIZE_BYTE + + BufferHelper.sizeOfSimpleString(name) + + BufferHelper.sizeOfNullableSimpleString(selector) + + DataConstants.SIZE_BOOLEAN; + } + + public void encode(final HornetQBuffer buffer) + { + buffer.writeByte(type.getType()); + buffer.writeSimpleString(SimpleString.toSimpleString(name)); + buffer.writeNullableSimpleString(SimpleString.toSimpleString(selector)); + buffer.writeBoolean(durable); + } + + public void decode(final HornetQBuffer buffer) + { + type = PersistedType.getType(buffer.readByte()); + name = buffer.readSimpleString().toString(); + SimpleString selectorStr = buffer.readNullableSimpleString(); + selector = (selectorStr == null) ? null : selectorStr.toString(); + durable = buffer.readBoolean(); + } +}
