http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/config/impl/JMSQueueConfigurationImpl.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/config/impl/JMSQueueConfigurationImpl.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/config/impl/JMSQueueConfigurationImpl.java new file mode 100644 index 0000000..a81aa88 --- /dev/null +++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/config/impl/JMSQueueConfigurationImpl.java @@ -0,0 +1,104 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.server.config.impl; + +import org.apache.activemq6.jms.server.config.JMSQueueConfiguration; + + +/** + * A QueueConfigurationImpl + * + * @author <a href="mailto:[email protected]">Jeff Mesnil</a> + * + * + */ +public class JMSQueueConfigurationImpl implements JMSQueueConfiguration +{ + + // Constants ----------------------------------------------------- + + // Attributes ---------------------------------------------------- + + private String name = null; + + private String selector = null; + + private boolean durable = true; + + private String[] bindings = null; + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + public JMSQueueConfigurationImpl() + { + } + + // QueueConfiguration implementation ----------------------------- + + public String[] getBindings() + { + return bindings; + } + + public JMSQueueConfigurationImpl setBindings(String... bindings) + { + this.bindings = bindings; + return this; + } + + public String getName() + { + return name; + } + + public JMSQueueConfigurationImpl setName(String name) + { + this.name = name; + return this; + } + + public String getSelector() + { + return selector; + } + + public JMSQueueConfigurationImpl setSelector(String selector) + { + this.selector = selector; + return this; + } + + public boolean isDurable() + { + return durable; + } + + public JMSQueueConfigurationImpl setDurable(boolean durable) + { + this.durable = durable; + return this; + } + + // Public -------------------------------------------------------- + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- + +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/config/impl/TopicConfigurationImpl.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/config/impl/TopicConfigurationImpl.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/config/impl/TopicConfigurationImpl.java new file mode 100644 index 0000000..c495e9e --- /dev/null +++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/config/impl/TopicConfigurationImpl.java @@ -0,0 +1,78 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.server.config.impl; + +import org.apache.activemq6.jms.server.config.TopicConfiguration; + + +/** + * A TopicConfigurationImpl + * + * @author <a href="mailto:[email protected]">Jeff Mesnil</a> + * + * + */ +public class TopicConfigurationImpl implements TopicConfiguration +{ + + // Constants ----------------------------------------------------- + + // Attributes ---------------------------------------------------- + + private String name; + + private String[] bindings; + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + public TopicConfigurationImpl() + { + } + + // TopicConfiguration implementation ----------------------------- + + public String[] getBindings() + { + return bindings; + } + + public TopicConfigurationImpl setBindings(String... bindings) + { + this.bindings = bindings; + return this; + } + + public String getName() + { + return name; + } + + public TopicConfigurationImpl setName(String name) + { + this.name = name; + return this; + } + + // Public -------------------------------------------------------- + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/config/impl/TransportConfigurationEncodingSupport.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/config/impl/TransportConfigurationEncodingSupport.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/config/impl/TransportConfigurationEncodingSupport.java new file mode 100644 index 0000000..deac43a --- /dev/null +++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/config/impl/TransportConfigurationEncodingSupport.java @@ -0,0 +1,153 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.server.config.impl; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.Pair; +import org.apache.activemq6.api.core.TransportConfiguration; +import org.apache.activemq6.utils.BufferHelper; +import org.apache.activemq6.utils.DataConstants; + +/** + * A TransportConfigurationEncodingSupport + * + * @author <a href="mailto:[email protected]">Jeff Mesnil</a> + * + * + */ +public class TransportConfigurationEncodingSupport +{ + public static List<Pair<TransportConfiguration, TransportConfiguration>> decodeConfigs(HornetQBuffer buffer) + { + int size = buffer.readInt(); + List<Pair<TransportConfiguration, TransportConfiguration>> configs = new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>(size); + + for (int i = 0; i < size; i++) + { + TransportConfiguration live = decode(buffer); + boolean hasBackup = buffer.readBoolean(); + TransportConfiguration backup = null; + if (hasBackup) + { + backup = decode(buffer); + } + configs.add(new Pair<TransportConfiguration, TransportConfiguration>(live, backup)); + } + + return configs; + } + + public static TransportConfiguration decode(HornetQBuffer buffer) + { + String name = BufferHelper.readNullableSimpleStringAsString(buffer); + String factoryClassName = buffer.readSimpleString().toString(); + int paramSize = buffer.readInt(); + Map<String, Object> params = new HashMap<String, Object>(); + for (int i = 0; i < paramSize; i++) + { + String key = buffer.readSimpleString().toString(); + String value = buffer.readSimpleString().toString(); + params.put(key, value); + } + TransportConfiguration config = new TransportConfiguration(factoryClassName, params, name); + return config; + } + + public static void encodeConfigs(HornetQBuffer buffer, + List<Pair<TransportConfiguration, TransportConfiguration>> configs) + { + buffer.writeInt(configs == null ? 0 : configs.size()); + if (configs != null) + { + for (Pair<TransportConfiguration, TransportConfiguration> pair : configs) + { + encode(buffer, pair.getA()); + boolean backup = (pair.getB() != null); + buffer.writeBoolean(backup); + if (backup) + { + encode(buffer, pair.getB()); + } + } + } + } + + public static void encode(HornetQBuffer buffer, TransportConfiguration config) + { + BufferHelper.writeAsNullableSimpleString(buffer, config.getName()); + BufferHelper.writeAsSimpleString(buffer, config.getFactoryClassName()); + buffer.writeInt(config.getParams().size()); + for (Entry<String, Object> param : config.getParams().entrySet()) + { + BufferHelper.writeAsSimpleString(buffer, param.getKey()); + BufferHelper.writeAsSimpleString(buffer, param.getValue().toString()); + } + } + + public static int getEncodeSize(TransportConfiguration config) + { + int size = BufferHelper.sizeOfNullableSimpleString(config.getName()) + + BufferHelper.sizeOfSimpleString(config.getFactoryClassName()); + + size += DataConstants.SIZE_INT; // number of params + for (Entry<String, Object> param : config.getParams().entrySet()) + { + size += BufferHelper.sizeOfSimpleString(param.getKey()); + size += BufferHelper.sizeOfSimpleString(param.getValue().toString()); + } + return size; + } + + public static int getEncodeSize(List<Pair<TransportConfiguration, TransportConfiguration>> configs) + { + int size = DataConstants.SIZE_INT; // number of configs; + if (configs != null) + { + for (Pair<TransportConfiguration, TransportConfiguration> pair : configs) + { + size += getEncodeSize(pair.getA()); + size += DataConstants.SIZE_BOOLEAN; // whether there is a backup config + if (pair.getB() != null) + { + size += getEncodeSize(pair.getB()); + } + } + } + return size; + } + + // Constants ----------------------------------------------------- + + // Attributes ---------------------------------------------------- + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + // Public -------------------------------------------------------- + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/embedded/EmbeddedJMS.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/embedded/EmbeddedJMS.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/embedded/EmbeddedJMS.java new file mode 100644 index 0000000..56931d0 --- /dev/null +++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/embedded/EmbeddedJMS.java @@ -0,0 +1,124 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.server.embedded; + +import javax.naming.Context; + +import org.apache.activemq6.core.registry.JndiBindingRegistry; +import org.apache.activemq6.core.registry.MapBindingRegistry; +import org.apache.activemq6.core.server.embedded.EmbeddedHornetQ; +import org.apache.activemq6.jms.server.config.JMSConfiguration; +import org.apache.activemq6.jms.server.impl.JMSServerManagerImpl; +import org.apache.activemq6.spi.core.naming.BindingRegistry; + +/** + * Simple bootstrap class that parses hornetq config files (server and jms and security) and starts + * a HornetQServer instance and populates it with configured JMS endpoints. + * <p> + * JMS Endpoints are registered with a simple MapBindingRegistry. If you want to use a different registry + * you must set the registry property of this class or call the setRegistry() method if you want to use JNDI + * + * @author <a href="mailto:[email protected]">Bill Burke</a> + * @version $Revision: 1 $ + */ +public class EmbeddedJMS extends EmbeddedHornetQ +{ + protected JMSServerManagerImpl serverManager; + protected BindingRegistry registry; + protected String jmsConfigResourcePath; + protected JMSConfiguration jmsConfiguration; + protected Context context; + + /** + * Classpath resource where JMS config file is. Defaults to 'hornetq-jms.xml' + * + * @param jmsConfigResourcePath + */ + public void setJmsConfigResourcePath(String jmsConfigResourcePath) + { + this.jmsConfigResourcePath = jmsConfigResourcePath; + } + + public BindingRegistry getRegistry() + { + return registry; + } + + /** + * Only set this property if you are using a custom BindingRegistry + * + * @param registry + */ + public void setRegistry(BindingRegistry registry) + { + this.registry = registry; + } + + /** + * By default, this class uses file-based configuration. Set this property to override it. + * + * @param jmsConfiguration + */ + public void setJmsConfiguration(JMSConfiguration jmsConfiguration) + { + this.jmsConfiguration = jmsConfiguration; + } + + /** + * If you want to use JNDI instead of an internal map, set this property + * + * @param context + */ + public void setContext(Context context) + { + this.context = context; + } + + /** + * Lookup in the registry for registered object, i.e. a ConnectionFactory. + * <p> + * This is a convenience method. + * @param name + */ + public Object lookup(String name) + { + return serverManager.getRegistry().lookup(name); + } + + @Override + public void start() throws Exception + { + super.initStart(); + if (jmsConfiguration != null) + { + serverManager = new JMSServerManagerImpl(hornetQServer, jmsConfiguration); + } + else if (jmsConfigResourcePath == null) serverManager = new JMSServerManagerImpl(hornetQServer); + else serverManager = new JMSServerManagerImpl(hornetQServer, jmsConfigResourcePath); + + if (registry == null) + { + if (context != null) registry = new JndiBindingRegistry(context); + else registry = new MapBindingRegistry(); + } + serverManager.setRegistry(registry); + serverManager.start(); + } + + @Override + public void stop() throws Exception + { + serverManager.stop(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/impl/JMSServerConfigParserImpl.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/impl/JMSServerConfigParserImpl.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/impl/JMSServerConfigParserImpl.java new file mode 100644 index 0000000..5080f16 --- /dev/null +++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/impl/JMSServerConfigParserImpl.java @@ -0,0 +1,518 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.server.impl; + +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.util.ArrayList; +import java.util.List; + +import org.apache.activemq6.api.config.HornetQDefaultConfiguration; +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.client.HornetQClient; +import org.apache.activemq6.api.jms.JMSFactoryType; +import org.apache.activemq6.core.config.impl.Validators; +import org.apache.activemq6.jms.server.HornetQJMSServerBundle; +import org.apache.activemq6.jms.server.HornetQJMSServerLogger; +import org.apache.activemq6.jms.server.JMSServerConfigParser; +import org.apache.activemq6.jms.server.config.ConnectionFactoryConfiguration; +import org.apache.activemq6.jms.server.config.JMSConfiguration; +import org.apache.activemq6.jms.server.config.JMSQueueConfiguration; +import org.apache.activemq6.jms.server.config.TopicConfiguration; +import org.apache.activemq6.jms.server.config.impl.ConnectionFactoryConfigurationImpl; +import org.apache.activemq6.jms.server.config.impl.JMSConfigurationImpl; +import org.apache.activemq6.jms.server.config.impl.JMSQueueConfigurationImpl; +import org.apache.activemq6.jms.server.config.impl.TopicConfigurationImpl; +import org.apache.activemq6.utils.XMLConfigurationUtil; +import org.apache.activemq6.utils.XMLUtil; +import org.w3c.dom.Element; +import org.w3c.dom.NamedNodeMap; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; + +/** + * JMS Configuration File Parser. + * + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + */ +public final class JMSServerConfigParserImpl implements JMSServerConfigParser +{ + protected static final String NAME_ATTR = "name"; + + public JMSServerConfigParserImpl() + { + } + + /** + * Parse the JMS Configuration XML as a JMSConfiguration object + */ + public JMSConfiguration parseConfiguration(final InputStream stream) throws Exception + { + Reader reader = new InputStreamReader(stream); + String xml = org.apache.activemq6.utils.XMLUtil.readerToString(reader); + xml = XMLUtil.replaceSystemProps(xml); + return parseConfiguration(XMLUtil.stringToElement(xml)); + } + + /** + * Parse the JMS Configuration XML as a JMSConfiguration object + */ + public JMSConfiguration parseConfiguration(final Node rootnode) throws Exception + { + + ArrayList<JMSQueueConfiguration> queues = new ArrayList<JMSQueueConfiguration>(); + ArrayList<TopicConfiguration> topics = new ArrayList<TopicConfiguration>(); + ArrayList<ConnectionFactoryConfiguration> cfs = new ArrayList<ConnectionFactoryConfiguration>(); + String domain = HornetQDefaultConfiguration.getDefaultJmxDomain(); + + Element e = (Element) rootnode; + + org.apache.activemq6.utils.XMLUtil.validate(rootnode, "schema/hornetq-jms.xsd"); + + String[] elements = new String[]{JMSServerDeployer.QUEUE_NODE_NAME, + JMSServerDeployer.TOPIC_NODE_NAME, + JMSServerDeployer.CONNECTION_FACTORY_NODE_NAME}; + for (String element : elements) + { + NodeList children = e.getElementsByTagName(element); + for (int i = 0; i < children.getLength(); i++) + { + Node node = children.item(i); + Node keyNode = node.getAttributes().getNamedItem(JMSServerConfigParserImpl.NAME_ATTR); + if (keyNode == null) + { + HornetQJMSServerLogger.LOGGER.jmsConfigMissingKey(node); + continue; + } + + if (node.getNodeName().equals(JMSServerDeployer.CONNECTION_FACTORY_NODE_NAME)) + { + cfs.add(parseConnectionFactoryConfiguration(node)); + } + else if (node.getNodeName().equals(JMSServerDeployer.TOPIC_NODE_NAME)) + { + topics.add(parseTopicConfiguration(node)); + } + else if (node.getNodeName().equals(JMSServerDeployer.QUEUE_NODE_NAME)) + { + queues.add(parseQueueConfiguration(node)); + } + } + } + + domain = XMLConfigurationUtil.getString(e, JMSServerDeployer.JMX_DOMAIN_NAME, HornetQDefaultConfiguration.getDefaultJmxDomain(), Validators.NO_CHECK); + + + JMSConfiguration value = newConfig(queues, topics, cfs, domain); + + return value; + } + + /** + * Parse the topic node as a TopicConfiguration object + * + * @param node + * @return topic configuration + * @throws Exception + */ + public TopicConfiguration parseTopicConfiguration(final Node node) throws Exception + { + String topicName = node.getAttributes().getNamedItem(JMSServerConfigParserImpl.NAME_ATTR).getNodeValue(); + NodeList children = node.getChildNodes(); + ArrayList<String> jndiNames = new ArrayList<String>(); + for (int i = 0; i < children.getLength(); i++) + { + Node child = children.item(i); + + if (JMSServerDeployer.ENTRY_NODE_NAME.equals(children.item(i).getNodeName())) + { + String jndiElement = child.getAttributes().getNamedItem("name").getNodeValue(); + jndiNames.add(jndiElement); + } + } + + String[] strBindings = jndiNames.toArray(new String[jndiNames.size()]); + + return newTopic(topicName, strBindings); + + } + + /** + * Parse the Queue Configuration node as a QueueConfiguration object + * + * @param node + * @return jms queue configuration + * @throws Exception + */ + public JMSQueueConfiguration parseQueueConfiguration(final Node node) throws Exception + { + Element e = (Element) node; + NamedNodeMap atts = node.getAttributes(); + String queueName = atts.getNamedItem(JMSServerConfigParserImpl.NAME_ATTR).getNodeValue(); + String selectorString = null; + boolean durable = XMLConfigurationUtil.getBoolean(e, "durable", JMSServerDeployer.DEFAULT_QUEUE_DURABILITY); + NodeList children = node.getChildNodes(); + ArrayList<String> jndiNames = new ArrayList<String>(); + for (int i = 0; i < children.getLength(); i++) + { + Node child = children.item(i); + + if (JMSServerDeployer.ENTRY_NODE_NAME.equals(children.item(i).getNodeName())) + { + String jndiName = child.getAttributes().getNamedItem("name").getNodeValue(); + jndiNames.add(jndiName); + } + else if (JMSServerDeployer.QUEUE_SELECTOR_NODE_NAME.equals(children.item(i).getNodeName())) + { + Node selectorNode = children.item(i); + Node attNode = selectorNode.getAttributes().getNamedItem("string"); + selectorString = attNode.getNodeValue(); + } + } + + String[] jndiArray = jndiNames.toArray(new String[jndiNames.size()]); + return newQueue(queueName, selectorString, durable, jndiArray); + } + + /** + * Parse the Connection Configuration node as a ConnectionFactoryConfiguration object + * + * @param node + * @return ConnectionFactoryConfiguration + * @throws Exception + */ + public ConnectionFactoryConfiguration parseConnectionFactoryConfiguration(final Node node) throws Exception + { + if (!node.getNodeName().equals(JMSServerDeployer.CONNECTION_FACTORY_NODE_NAME)) + { + // sanity check, this shouldn't ever happen + throw HornetQJMSServerBundle.BUNDLE.invalidNodeParsingCF(node.getNodeName()); + } + Element e = (Element) node; + + String name = node.getAttributes().getNamedItem(JMSServerConfigParserImpl.NAME_ATTR).getNodeValue(); + + String fact = e.getAttribute("signature"); + boolean isXA = XMLConfigurationUtil.getBoolean(e, + "xa", + HornetQClient.DEFAULT_XA); + + JMSFactoryType factType = resolveFactoryType(fact, isXA); + + long clientFailureCheckPeriod = XMLConfigurationUtil.getLong(e, + "client-failure-check-period", + HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, + Validators.MINUS_ONE_OR_GT_ZERO); + long connectionTTL = XMLConfigurationUtil.getLong(e, + "connection-ttl", + HornetQClient.DEFAULT_CONNECTION_TTL, + Validators.MINUS_ONE_OR_GE_ZERO); + long callTimeout = XMLConfigurationUtil.getLong(e, + "call-timeout", + HornetQClient.DEFAULT_CALL_TIMEOUT, + Validators.GE_ZERO); + long callFailoverTimeout = XMLConfigurationUtil.getLong(e, + "call-failover-timeout", + HornetQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, + Validators.MINUS_ONE_OR_GT_ZERO); + String clientID = XMLConfigurationUtil.getString(e, "client-id", null, Validators.NO_CHECK); + int dupsOKBatchSize = XMLConfigurationUtil.getInteger(e, + "dups-ok-batch-size", + HornetQClient.DEFAULT_ACK_BATCH_SIZE, + Validators.GT_ZERO); + int transactionBatchSize = XMLConfigurationUtil.getInteger(e, + "transaction-batch-size", + HornetQClient.DEFAULT_ACK_BATCH_SIZE, + Validators.GT_ZERO); + int consumerWindowSize = XMLConfigurationUtil.getInteger(e, + "consumer-window-size", + HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE, + Validators.MINUS_ONE_OR_GE_ZERO); + int producerWindowSize = XMLConfigurationUtil.getInteger(e, + "producer-window-size", + HornetQClient.DEFAULT_PRODUCER_WINDOW_SIZE, + Validators.MINUS_ONE_OR_GT_ZERO); + int consumerMaxRate = XMLConfigurationUtil.getInteger(e, + "consumer-max-rate", + HornetQClient.DEFAULT_CONSUMER_MAX_RATE, + Validators.MINUS_ONE_OR_GT_ZERO); + int confirmationWindowSize = XMLConfigurationUtil.getInteger(e, + "confirmation-window-size", + HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, + Validators.MINUS_ONE_OR_GT_ZERO); + int producerMaxRate = XMLConfigurationUtil.getInteger(e, + "producer-max-rate", + HornetQClient.DEFAULT_PRODUCER_MAX_RATE, + Validators.MINUS_ONE_OR_GT_ZERO); + boolean cacheLargeMessagesClient = XMLConfigurationUtil.getBoolean(e, + "cache-large-message-client", + HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT); + int minLargeMessageSize = XMLConfigurationUtil.getInteger(e, + "min-large-message-size", + HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, + Validators.GT_ZERO); + + boolean compressLargeMessages = XMLConfigurationUtil.getBoolean(e, + "compress-large-messages", + HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES); + + boolean blockOnAcknowledge = XMLConfigurationUtil.getBoolean(e, + "block-on-acknowledge", + HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE); + boolean blockOnNonDurableSend = XMLConfigurationUtil.getBoolean(e, + "block-on-non-durable-send", + HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND); + boolean blockOnDurableSend = XMLConfigurationUtil.getBoolean(e, + "block-on-durable-send", + HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND); + boolean autoGroup = XMLConfigurationUtil.getBoolean(e, "auto-group", HornetQClient.DEFAULT_AUTO_GROUP); + boolean preAcknowledge = XMLConfigurationUtil.getBoolean(e, + "pre-acknowledge", + HornetQClient.DEFAULT_PRE_ACKNOWLEDGE); + long retryInterval = XMLConfigurationUtil.getLong(e, + "retry-interval", + HornetQClient.DEFAULT_RETRY_INTERVAL, + Validators.GT_ZERO); + double retryIntervalMultiplier = XMLConfigurationUtil.getDouble(e, + "retry-interval-multiplier", + HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER, + Validators.GT_ZERO); + long maxRetryInterval = XMLConfigurationUtil.getLong(e, + "max-retry-interval", + HornetQClient.DEFAULT_MAX_RETRY_INTERVAL, + Validators.GT_ZERO); + int reconnectAttempts = XMLConfigurationUtil.getInteger(e, + "reconnect-attempts", + HornetQClient.DEFAULT_RECONNECT_ATTEMPTS, + Validators.MINUS_ONE_OR_GE_ZERO); + boolean failoverOnInitialConnection = XMLConfigurationUtil.getBoolean(e, + "failover-on-initial-connection", + HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION); + + boolean useGlobalPools = XMLConfigurationUtil.getBoolean(e, + "use-global-pools", + HornetQClient.DEFAULT_USE_GLOBAL_POOLS); + int scheduledThreadPoolMaxSize = XMLConfigurationUtil.getInteger(e, + "scheduled-thread-pool-max-size", + HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, + Validators.MINUS_ONE_OR_GT_ZERO); + int threadPoolMaxSize = XMLConfigurationUtil.getInteger(e, + "thread-pool-max-size", + HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE, + Validators.MINUS_ONE_OR_GT_ZERO); + String connectionLoadBalancingPolicyClassName = XMLConfigurationUtil.getString(e, + "connection-load-balancing-policy-class-name", + HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, + Validators.NOT_NULL_OR_EMPTY); + boolean ha = XMLConfigurationUtil.getBoolean(e, "ha", HornetQClient.DEFAULT_HA); + + String groupid = XMLConfigurationUtil.getString(e, "group-id", null, Validators.NO_CHECK); + List<String> jndiBindings = new ArrayList<String>(); + List<String> connectorNames = new ArrayList<String>(); + String discoveryGroupName = null; + + NodeList children = node.getChildNodes(); + + for (int j = 0; j < children.getLength(); j++) + { + Node child = children.item(j); + + if (JMSServerDeployer.ENTRIES_NODE_NAME.equals(child.getNodeName())) + { + NodeList entries = child.getChildNodes(); + for (int i = 0; i < entries.getLength(); i++) + { + Node entry = entries.item(i); + if (JMSServerDeployer.ENTRY_NODE_NAME.equals(entry.getNodeName())) + { + String jndiName = entry.getAttributes().getNamedItem("name").getNodeValue(); + + jndiBindings.add(jndiName); + } + } + } + else if (JMSServerDeployer.CONNECTORS_NODE_NAME.equals(child.getNodeName())) + { + NodeList entries = child.getChildNodes(); + for (int i = 0; i < entries.getLength(); i++) + { + Node entry = entries.item(i); + if (JMSServerDeployer.CONNECTOR_REF_ELEMENT.equals(entry.getNodeName())) + { + String connectorName = entry.getAttributes().getNamedItem("connector-name").getNodeValue(); + + connectorNames.add(connectorName); + } + } + } + else if (JMSServerDeployer.DISCOVERY_GROUP_ELEMENT.equals(child.getNodeName())) + { + discoveryGroupName = child.getAttributes().getNamedItem("discovery-group-name").getNodeValue(); + + } + } + + ConnectionFactoryConfiguration cfConfig; + + String[] strbindings = jndiBindings.toArray(new String[jndiBindings.size()]); + + if (discoveryGroupName != null) + { + cfConfig = new ConnectionFactoryConfigurationImpl() + .setDiscoveryGroupName(discoveryGroupName); + } + else + { + ArrayList<String> connectors = new ArrayList<String>(connectorNames.size()); + for (String connectorName : connectorNames) + { + connectors.add(connectorName); + } + cfConfig = new ConnectionFactoryConfigurationImpl() + .setConnectorNames(connectors); + } + + cfConfig + .setName(name) + .setHA(ha) + .setBindings(strbindings) + .setFactoryType(factType) + .setClientID(clientID) + .setClientFailureCheckPeriod(clientFailureCheckPeriod) + .setConnectionTTL(connectionTTL) + .setCallTimeout(callTimeout) + .setCallFailoverTimeout(callFailoverTimeout) + .setCacheLargeMessagesClient(cacheLargeMessagesClient) + .setMinLargeMessageSize(minLargeMessageSize) + .setCompressLargeMessages(compressLargeMessages) + .setConsumerWindowSize(consumerWindowSize) + .setConsumerMaxRate(consumerMaxRate) + .setConfirmationWindowSize(confirmationWindowSize) + .setProducerWindowSize(producerWindowSize) + .setProducerMaxRate(producerMaxRate) + .setBlockOnAcknowledge(blockOnAcknowledge) + .setBlockOnDurableSend(blockOnDurableSend) + .setBlockOnNonDurableSend(blockOnNonDurableSend) + .setAutoGroup(autoGroup) + .setPreAcknowledge(preAcknowledge) + .setLoadBalancingPolicyClassName(connectionLoadBalancingPolicyClassName) + .setTransactionBatchSize(transactionBatchSize) + .setDupsOKBatchSize(dupsOKBatchSize) + .setUseGlobalPools(useGlobalPools) + .setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize) + .setThreadPoolMaxSize(threadPoolMaxSize) + .setRetryInterval(retryInterval) + .setRetryIntervalMultiplier(retryIntervalMultiplier) + .setMaxRetryInterval(maxRetryInterval) + .setReconnectAttempts(reconnectAttempts) + .setFailoverOnInitialConnection(failoverOnInitialConnection) + .setGroupID(groupid); + + return cfConfig; + } + + private JMSFactoryType resolveFactoryType(String fact, boolean isXA) throws HornetQException + { + if ("".equals(fact)) + { + fact = "generic"; + } + if (isXA) + { + if ("generic".equals(fact)) + { + return JMSFactoryType.XA_CF; + } + if ("queue".equals(fact)) + { + return JMSFactoryType.QUEUE_XA_CF; + } + if ("topic".equals(fact)) + { + return JMSFactoryType.TOPIC_XA_CF; + } + } + else + { + if ("generic".equals(fact)) + { + return JMSFactoryType.CF; + } + if ("queue".equals(fact)) + { + return JMSFactoryType.QUEUE_CF; + } + if ("topic".equals(fact)) + { + return JMSFactoryType.TOPIC_CF; + } + } + throw HornetQJMSServerBundle.BUNDLE.invalidSignatureParsingCF(fact); + } + + /** + * hook for integration layers + * + * @param topicName + * @param strBindings + * @return + */ + protected TopicConfiguration newTopic(final String topicName, final String[] strBindings) + { + return new TopicConfigurationImpl() + .setName(topicName) + .setBindings(strBindings); + } + + /** + * hook for integration layers + * + * @param queueName + * @param selectorString + * @param durable + * @param jndiArray + * @return + */ + protected JMSQueueConfiguration newQueue(final String queueName, + final String selectorString, + final boolean durable, + final String[] jndiArray) + { + return new JMSQueueConfigurationImpl(). + setName(queueName). + setSelector(selectorString). + setDurable(durable). + setBindings(jndiArray); + } + + /** + * hook for integration layers + * + * @param queues + * @param topics + * @param cfs + * @param domain + * @return + */ + protected JMSConfiguration newConfig(final ArrayList<JMSQueueConfiguration> queues, + final ArrayList<TopicConfiguration> topics, + final ArrayList<ConnectionFactoryConfiguration> cfs, String domain) + { + return new JMSConfigurationImpl() + .setConnectionFactoryConfigurations(cfs) + .setQueueConfigurations(queues) + .setTopicConfigurations(topics) + .setDomain(domain); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/impl/JMSServerDeployer.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/impl/JMSServerDeployer.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/impl/JMSServerDeployer.java new file mode 100644 index 0000000..ec3dbd9 --- /dev/null +++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/impl/JMSServerDeployer.java @@ -0,0 +1,185 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.server.impl; + +import org.apache.activemq6.core.deployers.DeploymentManager; +import org.apache.activemq6.core.deployers.impl.XmlDeployer; +import org.apache.activemq6.jms.server.JMSServerConfigParser; +import org.apache.activemq6.jms.server.JMSServerManager; +import org.apache.activemq6.jms.server.config.ConnectionFactoryConfiguration; +import org.apache.activemq6.jms.server.config.JMSQueueConfiguration; +import org.apache.activemq6.jms.server.config.TopicConfiguration; +import org.w3c.dom.Node; + +/** + * @author <a href="[email protected]">Andy Taylor</a> + * @author <a href="[email protected]">Tim Fox</a> + * @author <a href="[email protected]">Jeff Mesnil</a> + */ +public class JMSServerDeployer extends XmlDeployer +{ + private final JMSServerConfigParser parser; + + private final JMSServerManager jmsServerManager; + + protected static final String CONNECTOR_REF_ELEMENT = "connector-ref"; + + protected static final String DISCOVERY_GROUP_ELEMENT = "discovery-group-ref"; + + protected static final String ENTRIES_NODE_NAME = "entries"; + + protected static final String ENTRY_NODE_NAME = "entry"; + + protected static final String CONNECTORS_NODE_NAME = "connectors"; + + protected static final String CONNECTION_FACTORY_NODE_NAME = "connection-factory"; + + protected static final String QUEUE_NODE_NAME = "queue"; + + protected static final String QUEUE_SELECTOR_NODE_NAME = "selector"; + + protected static final String TOPIC_NODE_NAME = "topic"; + + protected static final String JMX_DOMAIN_NAME = "jmx-domain"; + + protected static final boolean DEFAULT_QUEUE_DURABILITY = true; + + public JMSServerDeployer(final JMSServerManager jmsServerManager, + final DeploymentManager deploymentManager) + { + super(deploymentManager); + + this.jmsServerManager = jmsServerManager; + + parser = new JMSServerConfigParserImpl(); + } + + /** + * the names of the elements to deploy + * + * @return the names of the elements todeploy + */ + @Override + public String[] getElementTagName() + { + return new String[]{JMSServerDeployer.QUEUE_NODE_NAME, + JMSServerDeployer.TOPIC_NODE_NAME, + JMSServerDeployer.CONNECTION_FACTORY_NODE_NAME}; + } + + @Override + public void validate(final Node rootNode) throws Exception + { + org.apache.activemq6.utils.XMLUtil.validate(rootNode, "schema/hornetq-jms.xsd"); + } + + /** + * deploy an element + * + * @param node the element to deploy + * @throws Exception + */ + @Override + public void deploy(final Node node) throws Exception + { + createAndBindObject(node); + } + + /** + * Creates the object to bind, this will either be a JBossConnectionFActory, HornetQQueue or + * HornetQTopic. + * + * @param node the config + * @throws Exception + */ + private void createAndBindObject(final Node node) throws Exception + { + if (node.getNodeName().equals(JMSServerDeployer.CONNECTION_FACTORY_NODE_NAME)) + { + deployConnectionFactory(node); + } + else if (node.getNodeName().equals(JMSServerDeployer.QUEUE_NODE_NAME)) + { + deployQueue(node); + } + else if (node.getNodeName().equals(JMSServerDeployer.TOPIC_NODE_NAME)) + { + deployTopic(node); + } + } + + /** + * Undeploys an element. + * + * @param node the element to undeploy + * @throws Exception + */ + @Override + public void undeploy(final Node node) throws Exception + { + if (node.getNodeName().equals(JMSServerDeployer.CONNECTION_FACTORY_NODE_NAME)) + { + String cfName = node.getAttributes().getNamedItem(getKeyAttribute()).getNodeValue(); + jmsServerManager.destroyConnectionFactory(cfName); + } + else if (node.getNodeName().equals(JMSServerDeployer.QUEUE_NODE_NAME)) + { + String queueName = node.getAttributes().getNamedItem(getKeyAttribute()).getNodeValue(); + jmsServerManager.removeQueueFromJNDI(queueName); + } + else if (node.getNodeName().equals(JMSServerDeployer.TOPIC_NODE_NAME)) + { + String topicName = node.getAttributes().getNamedItem(getKeyAttribute()).getNodeValue(); + jmsServerManager.removeTopicFromJNDI(topicName); + } + } + + @Override + public String[] getDefaultConfigFileNames() + { + return new String[]{"hornetq-jms.xml"}; + } + + + /** + * @param node + * @throws Exception + */ + private void deployTopic(final Node node) throws Exception + { + TopicConfiguration topicConfig = parser.parseTopicConfiguration(node); + jmsServerManager.createTopic(false, topicConfig.getName(), topicConfig.getBindings()); + } + + /** + * @param node + * @throws Exception + */ + private void deployQueue(final Node node) throws Exception + { + JMSQueueConfiguration queueconfig = parser.parseQueueConfiguration(node); + jmsServerManager.createQueue(false, queueconfig.getName(), queueconfig.getSelector(), queueconfig.isDurable(), queueconfig.getBindings()); + } + + /** + * @param node + * @throws Exception + */ + private void deployConnectionFactory(final Node node) throws Exception + { + ConnectionFactoryConfiguration cfConfig = parser.parseConnectionFactoryConfiguration(node); + jmsServerManager.createConnectionFactory(false, cfConfig, cfConfig.getBindings()); + } + + +}
