http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/adapters/JMSProducer.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/services/messaging/adapters/JMSProducer.java b/core/src/flex/messaging/services/messaging/adapters/JMSProducer.java deleted file mode 100644 index 557163c..0000000 --- a/core/src/flex/messaging/services/messaging/adapters/JMSProducer.java +++ /dev/null @@ -1,336 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.services.messaging.adapters; - -import flex.messaging.MessageException; -import flex.messaging.config.ConfigurationException; -import flex.messaging.log.Log; -import flex.messaging.messages.MessagePerformanceUtils; - -import javax.jms.JMSException; -import javax.jms.MessageProducer; -import javax.naming.NamingException; - -import java.io.Serializable; -import java.lang.reflect.Field; -import java.util.Iterator; -import java.util.Map; - -/** - * A JMSProxy subclass for <code>javax.jms.MessageProducer</code> instances. - * - * - */ -public abstract class JMSProducer extends JMSProxy -{ - /* JMS related variables */ - protected MessageProducer producer; - - protected int deliveryMode; - protected int messagePriority; - protected String messageType; - - //-------------------------------------------------------------------------- - // - // Constructor - // - //-------------------------------------------------------------------------- - - /** - * Create a new JMSProducer with default delivery mode of <code>javax.jms.Message.DEFAULT_DELIVERY_MODE</code> - * and default message priority of <code>javax.jms.Message.DEFAULT_PRIORITY</code>. - */ - public JMSProducer() - { - super(); - deliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE; - messagePriority = javax.jms.Message.DEFAULT_PRIORITY; - } - - //-------------------------------------------------------------------------- - // - // Initialize, validate, start, and stop methods. - // - //-------------------------------------------------------------------------- - - /** - * Initialize with settings from the JMS adapter. - * - * @param settings JMS settings to use for initialization. - */ - public void initialize(JMSSettings settings) - { - super.initialize(settings); - - String deliveryString = settings.getDeliveryMode(); - if (deliveryString.equals(JMSConfigConstants.DEFAULT_DELIVERY_MODE)) - deliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE; - else if (deliveryString.equals(JMSConfigConstants.PERSISTENT)) - deliveryMode = javax.jms.DeliveryMode.PERSISTENT; - else if (deliveryString.equals(JMSConfigConstants.NON_PERSISTENT)) - deliveryMode = javax.jms.DeliveryMode.NON_PERSISTENT; - - messagePriority = settings.getMessagePriority(); - messageType = settings.getMessageType(); - } - - /** - * Verifies that the <code>JMSProducer</code> is in valid state before - * it is started. For <code>JMSProducer</code> to be in valid state, it needs - * to have a message type assigned. - */ - protected void validate() - { - super.validate(); - - if (messageType == null || !(messageType.equals(JMSConfigConstants.TEXT_MESSAGE) - || messageType.equals(JMSConfigConstants.OBJECT_MESSAGE) - || messageType.equals(JMSConfigConstants.MAP_MESSAGE)) ) - { - // Unsupported JMS Message Type ''{0}''. Valid values are javax.jms.TextMessage, javax.jms.ObjectMessage, javax.jms.MapMessage. - ConfigurationException ce = new ConfigurationException(); - ce.setMessage(JMSConfigConstants.INVALID_JMS_MESSAGE_TYPE, new Object[] {messageType}); - throw ce; - } - } - - /** - * Starts the <code>JMSProducer</code>. Subclasses should call <code>super.start</code>. - */ - public void start() throws NamingException, JMSException - { - super.start(); - - if (Log.isInfo()) - Log.getLogger(JMSAdapter.LOG_CATEGORY).info("JMS producer for JMS destination '" - + destinationJndiName +"' is starting."); - } - - /** - * Stops the <code>JMSProducer</code> by closing its underlying - * <code>MessageProducer</code>. It then calls <code>JMSProxy.close</code> - * for session and connection closure. - */ - public void stop() - { - if (Log.isInfo()) - Log.getLogger(JMSAdapter.LOG_CATEGORY).info("JMS producer for JMS destination '" + - destinationJndiName + "' is stopping."); - - try - { - if (producer != null) - producer.close(); - } - catch (JMSException e) - { - if (Log.isWarn()) - Log.getLogger(JMSAdapter.LOG_CATEGORY).warn("JMS producer for JMS destination '" + - destinationJndiName + "' received an error while closing" - + " its underlying MessageProducer: " + e.getMessage()); - } - - super.stop(); - } - - //-------------------------------------------------------------------------- - // - // Public Methods - // - //-------------------------------------------------------------------------- - - /** - * Returns the delivery mode used by the <code>JMSProducer</code>. - * - * @return The delivery mode used by the <code>JMSProducer</code>. - */ - public int getDeliveryMode() - { - return deliveryMode; - } - - /** - * Sets the delivery mode used by the <code>JMSProducer</code>. Valid values - * javax.jms.DeliveryMode.PERSISTENT and javax.jms.DeliveryMode.NON_PERSISTENT. - * This propery is optional and defaults to javax.jms.Message.DEFAULT_DELIVERY_MODE. - * - * @param deliveryMode - */ - public void setDeliveryMode(int deliveryMode) - { - if (deliveryMode == javax.jms.Message.DEFAULT_DELIVERY_MODE - || deliveryMode == javax.jms.DeliveryMode.NON_PERSISTENT - || deliveryMode == javax.jms.DeliveryMode.PERSISTENT) - this.deliveryMode = deliveryMode; - } - - /** - * Returns the message priority used by the <code>JMSProducer</code>. - * - * @return an int specifying the message priority. - */ - public int getMessagePriority() - { - return messagePriority; - } - - /** - * Sets the message priority used by the <code>JMSProducer</code>. - * This property is optional and defaults to <code>javax.jms.Message.DEFAULT_PRIORITY</code>. - * - * @param messagePriority an int specifying the message priority. - */ - public void setMessagePriority(int messagePriority) - { - this.messagePriority = messagePriority; - } - - /** - * Returns the message type used by the <code>JMSProducer</code>. - * - * @return The message type used by the <code>JMSProducer</code>. - */ - public String getMessageType() - { - return messageType; - } - - /** - * Sets the message type used by the <code>JMSProducer</code>. Supported - * types are <code>javax.jms.TextMessage</code> and <code>javax.jms.ObjectMessage</code>. - * This property is mandatory. - * - * @param messageType String representing the message type used. - */ - public void setMessageType(String messageType) - { - this.messageType = messageType; - } - - //-------------------------------------------------------------------------- - // - // Protected and Private Methods - // - //-------------------------------------------------------------------------- - - protected void copyHeadersToProperties(Map properties, javax.jms.Message message) throws JMSException - { - // Generic Flex headers become JMS properties, named Flex headers become JMS headers - for (Iterator iter = properties.keySet().iterator(); iter.hasNext();) - { - String propName = (String)iter.next(); - Object propValue = properties.get(propName); - - // For now, only named property is TTL. - if (!propName.equals(JMSConfigConstants.TIME_TO_LIVE)) - { - // MPI header contains a MessagePerformaceInfo object that cannot - // be set as a JMS header property. Instead, it is broken down - // to its primitive types and each primitive is individually - // set as a JMS header property. - if (propName.equals(MessagePerformanceUtils.MPI_HEADER_IN)) - { - Field[] fields = propValue.getClass().getFields(); - for (int i = 0; i < fields.length; i++) - { - Field field = fields[i]; - // Use MPI_HEADER_IN as prefix to the property name so that - // they can be distinguished later when the MessagePerformanceInfo - // object gets built from the headers. - String mpiPropertyName = MessagePerformanceUtils.MPI_HEADER_IN + field.getName(); - Object mpiPropertyValue = null; - try - { - mpiPropertyValue = field.get(propValue); - message.setObjectProperty(mpiPropertyName, mpiPropertyValue); - } - catch (Exception e) - { - if (Log.isWarn()) - Log.getLogger(JMSAdapter.LOG_CATEGORY).warn("JMSProducer could not retrieve the value of MessagePerformanceUtils property '" - + propValue + "' from the Flex message, therefore it will not be set on the JMS message."); - } - } - } - else if (propValue != null) - { - message.setObjectProperty(propName, propValue); - } - } - } - } - - protected long getTimeToLive(Map properties) throws JMSException - { - long timeToLive = producer.getTimeToLive(); - if (properties.containsKey(JMSConfigConstants.TIME_TO_LIVE)) - { - long l = ((Long)properties.get(JMSConfigConstants.TIME_TO_LIVE)).longValue(); - if (l != 0) - { - // Don't let Flex default override JMS implementation default, - // only explicit ActionScript TTL usage overrides JMS TTL. - timeToLive = l; - } - } - return timeToLive; - } - - void sendMessage(flex.messaging.messages.Message flexMessage) throws JMSException - { - MessagePerformanceUtils.markServerPreAdapterExternalTime(flexMessage); - - if (JMSConfigConstants.TEXT_MESSAGE.equals(messageType)) - { - sendTextMessage(flexMessage.getBody().toString(), flexMessage.getHeaders()); - } - else if (JMSConfigConstants.OBJECT_MESSAGE.equals(messageType)) - { - try - { - sendObjectMessage((Serializable)flexMessage.getBody(), flexMessage.getHeaders()); - } - catch (ClassCastException ce) - { - // The body of the Flex Message could not be converted to a Serializable Java Object. - MessageException me = new MessageException(); - me.setMessage(JMSConfigConstants.NONSERIALIZABLE_MESSAGE_BODY); - throw me; - } - } - else if (JMSConfigConstants.MAP_MESSAGE.equals(messageType)) - { - try - { - sendMapMessage((Map<String,?>)flexMessage.getBody(), flexMessage.getHeaders()); - } - catch (ClassCastException ce) - { - // 10812=The body of the Flex message could not be converted to a Java Map object. - MessageException me = new MessageException(); - me.setMessage(JMSConfigConstants.NONMAP_MESSAGE_BODY); - throw me; - } - } - } - - abstract void sendObjectMessage(Serializable obj, Map properties) throws JMSException; - - abstract void sendTextMessage(String text, Map properties) throws JMSException; - - abstract void sendMapMessage(Map<String,?> map, Map properties) throws JMSException; -}
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/adapters/JMSProxy.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/services/messaging/adapters/JMSProxy.java b/core/src/flex/messaging/services/messaging/adapters/JMSProxy.java deleted file mode 100644 index a009a6c..0000000 --- a/core/src/flex/messaging/services/messaging/adapters/JMSProxy.java +++ /dev/null @@ -1,394 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.services.messaging.adapters; - -import java.util.Hashtable; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Session; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NamingException; - -import flex.messaging.config.ConfigurationException; -import flex.messaging.log.Log; - -/** - * The JMSProxy is the superclass for all producers and consumers - * on both topics and queues. This class contains shared behavior - * between producers and consumers. - */ -public abstract class JMSProxy -{ - /* JMS related variables */ - protected Connection connection; - protected ConnectionCredentials connectionCredentials; - protected ConnectionFactory connectionFactory; - protected Session session; - protected Destination destination; - protected Context jndiContext; - - protected int acknowledgeMode; - protected String connectionFactoryName; - protected String destinationJndiName; - protected Hashtable initialContextEnvironment; - - //-------------------------------------------------------------------------- - // - // Constructor - // - //-------------------------------------------------------------------------- - - /** - * Creates a new <code>JMSProxy</code> default default acknowledge mode of - * <code>javax.jms.Session.AUTO_ACKNOWLEDGE</code>. - */ - public JMSProxy() - { - acknowledgeMode = javax.jms.Session.AUTO_ACKNOWLEDGE; - } - - //-------------------------------------------------------------------------- - // - // Initialize, validate, start, and stop methods. - // - //-------------------------------------------------------------------------- - - /** - * Initialize with settings from the JMS adapter. - * - * @param settings JMS settings to use for initialization. - */ - public void initialize(JMSSettings settings) - { - String ackString = settings.getAcknowledgeMode(); - if (ackString.equals(JMSConfigConstants.AUTO_ACKNOWLEDGE)) - acknowledgeMode = Session.AUTO_ACKNOWLEDGE; - else if (ackString.equals(JMSConfigConstants.CLIENT_ACKNOWLEDGE)) - acknowledgeMode = Session.CLIENT_ACKNOWLEDGE; - else if (ackString.equals(JMSConfigConstants.DUPS_OK_ACKNOWLEDGE)) - acknowledgeMode = Session.DUPS_OK_ACKNOWLEDGE; - - connectionFactoryName = settings.getConnectionFactory(); - String username = settings.getConnectionUsername(); - String password = settings.getConnectionPassword(); - if (username != null || password != null) - { - connectionCredentials = new ConnectionCredentials(username, password); - } - destinationJndiName = settings.getDestinationJNDIName(); - initialContextEnvironment = settings.getInitialContextEnvironment(); - } - - /** - * Verifies that the <code>JMSProxy</code> is in valid state before - * it is started. For <code>JMSProxy</code> to be in valid state, it needs - * to have a connection factory name and destination jndi name assigned. - */ - protected void validate() - { - if (connectionFactoryName == null) - { - // JMS connection factory of message destinations with JMS Adapters must be specified. - ConfigurationException ce = new ConfigurationException(); - ce.setMessage(JMSConfigConstants.MISSING_CONNECTION_FACTORY); - throw ce; - } - - if (destinationJndiName == null) - { - // JNDI names for message destinations with JMS Adapters must be specified. - ConfigurationException ce = new ConfigurationException(); - ce.setMessage(JMSConfigConstants.MISSING_DESTINATION_JNDI_NAME); - throw ce; - } - } - - /** - * Starts the <code>JMSProxy</code>. The default implementation verifies - * that <code>JMSProxy</code> is in a valid state to be started and then - * initializes JNDI context and connection factory for JMS. Subclasses - * should call <code>super.start</code>. - * - * @throws NamingException The thrown naming exception. - * @throws JMSException The thrown JMS exception. - */ - public void start() throws NamingException, JMSException - { - validate(); - initializeJndiContext(); - initializeConnectionFactory(); - initializeDestination(); - } - - /** - * Stops the <code>JMSProxy</code> by stopping its associated session - * and connection. - */ - public void stop() - { - try - { - if (session != null) - session.close(); - } - catch (JMSException e) - { - if (Log.isWarn()) - Log.getLogger(JMSAdapter.LOG_CATEGORY).warn("JMS proxy for JMS destination '" - + destinationJndiName + "' received an error while closing" - + " its underlying Session: " + e.getMessage()); - } - - try - { - if (connection != null) - connection.close(); - } - catch (JMSException e) - { - if (Log.isWarn()) - Log.getLogger(JMSAdapter.LOG_CATEGORY).warn("JMS proxy for JMS destination '" - + destinationJndiName + "' received an error while closing" - + " its underlying Connection: " + e.getMessage()); - } - - try - { - if (jndiContext != null) - jndiContext.close(); - } - catch (NamingException e) - { - if (Log.isWarn()) - Log.getLogger(JMSAdapter.LOG_CATEGORY).warn("JMS proxy for JMS destination '" - + destinationJndiName + "' received an error while closing" - + " its underlying JNDI context: " + e.getMessage()); - } - } - - //-------------------------------------------------------------------------- - // - // Public Methods - // - //-------------------------------------------------------------------------- - - /** - * Returns the acknowledge mode used by the <code>JMSProxy</code>. - * - * @return The acknowledge mode used by the <code>JMSProxy</code>. - */ - public int getAcknowledgeMode() - { - return acknowledgeMode; - } - - /** - * Sets the acknowledge mode used by the <code>JMSProxy</code>. Valid values - * are javax.jms.Session.AUTO_ACKNOWLEDGE, javax.jms.Session.CLIENT_ACKNOWLEDGE, - * javax.jms.Session.DUPS_OK_ACNOWLEDGE. This property is optional and - * defaults to javax.jms.Session.AUTO_ACKNOWLEDGE. - * - * @param acknowledgeMode An int representing the acknowledge mode used. - */ - public void setAcknowledgeMode(int acknowledgeMode) - { - if (acknowledgeMode == Session.AUTO_ACKNOWLEDGE - || acknowledgeMode == Session.CLIENT_ACKNOWLEDGE - || acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE) - this.acknowledgeMode = acknowledgeMode; - } - - /** - * Returns the connection factory name. - * - * @return The connection factory name. - */ - public String getConnectionFactoryName() - { - return connectionFactoryName; - } - - /** - * Sets the connection factory name. This property should not changed - * after startup. - * - * @param connectionFactoryName The connection factory name. - */ - public void setConnectionFactoryName(String connectionFactoryName) - { - this.connectionFactoryName = connectionFactoryName; - } - - /** - * Returns the connection credentials used while creating JMS connections. - * - * @return The connection credentials used while creating JMS connections. - */ - public ConnectionCredentials getConnectionCredentials() - { - return connectionCredentials; - } - - /** - * Sets the connection credentials. Connections credentials are passed to JMS - * connection factory when a JMS connection is created. - * - * @param connectionCredentials The connection credentials. - */ - public void setConnectionCredentials(ConnectionCredentials connectionCredentials) - { - this.connectionCredentials = connectionCredentials; - } - - /** - * Returns the JNDI name of the JMS destination that <code>JMSProxy</code> talks to. - * - * @return The JNDI name of the JMS destination. - */ - public String getDestinationJndiName() - { - return destinationJndiName; - } - - /** - * Sets the JNDI name of the JMS destination that <code>JMSProxy</code> talks to. - * - * @param destinationJndiName The JNDI name of the JMS destination. - */ - public void setDestinationJndiName(String destinationJndiName) - { - this.destinationJndiName = destinationJndiName; - } - - /** - * Returns the <code>initial-context-environment</code> property. - * - * @return a Hashtable of the <code>initial-context-environment</code>. - */ - public Hashtable getInitialContextEnvironment() - { - return initialContextEnvironment; - } - - /** - * Sets the <code>initial-context-environment</code> property. This property - * is optional. This property should be change after startup. - * - * @param env A Hashtable of the <code>initial-context-environment</code>. - */ - public void setInitialContextEnvironment(Hashtable env) - { - initialContextEnvironment = env; - } - - //-------------------------------------------------------------------------- - // - // Protected and Private Methods - // - //-------------------------------------------------------------------------- - - /** - * Initializes the connection factory needed for JMS. - */ - protected ConnectionFactory initializeConnectionFactory() throws NamingException - { - if (connectionFactory == null) - connectionFactory = (ConnectionFactory)jndiContext.lookup(connectionFactoryName); - - return connectionFactory; - } - - /** - * Initializes the destination (topic and queue) used by JMS. - */ - protected Destination initializeDestination() throws NamingException - { - if (destination == null) - destination = (Destination)jndiContext.lookup(destinationJndiName); - - return destination; - } - - /** - * Initializes the JNDI context needed by JMS. This should be called before - * any other initialize methods. - */ - protected Context initializeJndiContext() throws NamingException - { - if (jndiContext != null) - stop(); - - if (initialContextEnvironment != null) - jndiContext = new InitialContext(initialContextEnvironment); - else - jndiContext = new InitialContext(); - - return jndiContext; - } - - //-------------------------------------------------------------------------- - // - // Nested Classes - // - //-------------------------------------------------------------------------- - - /** - * A static inner class for connection credentials that is passed to JMS - * connection factory when a JMS connection is created. - */ - public static class ConnectionCredentials - { - private String username; - private String password; - - /** - * Creates a <code>ConnectionCredentials</code> instance with the supplied - * username and password. - * - * @param username Username of the credential. - * @param password Password of the credential. - */ - public ConnectionCredentials(String username, String password) - { - this.username = username; - this.password = password; - } - - /** - * Returns the username being used. - * - * @return The username being used. - */ - public String getUsername() - { - return username; - } - - /** - * Returns the password being used. - * - * @return The password being used. - */ - public String getPassword() - { - return password; - } - } -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/adapters/JMSQueueConsumer.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/services/messaging/adapters/JMSQueueConsumer.java b/core/src/flex/messaging/services/messaging/adapters/JMSQueueConsumer.java deleted file mode 100644 index e1620be..0000000 --- a/core/src/flex/messaging/services/messaging/adapters/JMSQueueConsumer.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.services.messaging.adapters; - -import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.QueueConnection; -import javax.jms.QueueConnectionFactory; -import javax.jms.QueueSession; -import javax.naming.NamingException; - -import flex.messaging.MessageException; - -/** - * A <code>JMSConsumer</code> subclass specifically for JMS Queue receivers. - */ -public class JMSQueueConsumer extends JMSConsumer -{ - //-------------------------------------------------------------------------- - // - // Initialize, validate, start, and stop methods. - // - //-------------------------------------------------------------------------- - - /** - * Starts the <code>JMSQueueConsumer</code>. - * - * @throws NamingException The thrown naming exception. - * @throws JMSException The thrown JMS exception. - */ - public void start() throws NamingException, JMSException - { - super.start(); - - // Establish queue - Queue queue = null; - try - { - queue = (Queue)destination; - } - catch (ClassCastException cce) - { - // JMS queue proxy for JMS destination ''{0}'' has a destination type of ''{1}'' which is not Queue. - MessageException me = new MessageException(); - me.setMessage(JMSConfigConstants.NON_QUEUE_DESTINATION, new Object[] {destinationJndiName, destination.getClass().getName()}); - throw me; - } - - // Create connection - try - { - QueueConnectionFactory queueFactory = (QueueConnectionFactory)connectionFactory; - if (connectionCredentials != null) - connection = queueFactory.createQueueConnection(connectionCredentials.getUsername(), connectionCredentials.getPassword()); - else - connection = queueFactory.createQueueConnection(); - } - catch (ClassCastException cce) - { - // JMS queue proxy for JMS destination ''{0}'' has a connection factory type of ''{1}'' which is not QueueConnectionFactory. - MessageException me = new MessageException(); - me.setMessage(JMSConfigConstants.NON_QUEUE_FACTORY, new Object[] {destinationJndiName, connectionFactory.getClass().getName()}); - throw me; - } - - QueueConnection queueConnection = (QueueConnection)connection; - - // Create queue session on the connection - session = queueConnection.createQueueSession(false, getAcknowledgeMode()); - - // Create receiver on the queue session - QueueSession queueSession = (QueueSession) session; - - // Handle message selectors - if (selectorExpression != null) - consumer = queueSession.createReceiver(queue, selectorExpression); - else - consumer = queueSession.createReceiver(queue); - - startMessageReceiver(); - } -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/adapters/JMSQueueProducer.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/services/messaging/adapters/JMSQueueProducer.java b/core/src/flex/messaging/services/messaging/adapters/JMSQueueProducer.java deleted file mode 100644 index 7b77ee8..0000000 --- a/core/src/flex/messaging/services/messaging/adapters/JMSQueueProducer.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.services.messaging.adapters; - -import java.io.Serializable; -import java.util.Map; - -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.ObjectMessage; -import javax.jms.Queue; -import javax.jms.QueueConnection; -import javax.jms.QueueConnectionFactory; -import javax.jms.QueueSender; -import javax.jms.QueueSession; -import javax.jms.TextMessage; -import javax.naming.NamingException; - -import flex.messaging.MessageException; - -/** - * A <code>JMSProducer</code> subclass specifically for JMS Queue senders. - * - * - */ -public class JMSQueueProducer extends JMSProducer -{ - /* JMS related variables */ - private QueueSender sender; - - /** - * Starts <code>JMSQueueProducer</code>. - */ - public void start() throws NamingException, JMSException - { - super.start(); - - // Establish queue - Queue queue = null; - try - { - queue = (Queue)destination; - } - catch (ClassCastException cce) - { - // JMS queue proxy for JMS destination ''{0}'' has a destination type of ''{1}'' which is not Queue. - MessageException me = new MessageException(); - me.setMessage(JMSConfigConstants.NON_QUEUE_DESTINATION, new Object[] {destinationJndiName, destination.getClass().getName()}); - throw me; - } - - // Create connection - try - { - QueueConnectionFactory queueFactory = (QueueConnectionFactory)connectionFactory; - if (connectionCredentials != null) - connection = queueFactory.createQueueConnection(connectionCredentials.getUsername(), connectionCredentials.getPassword()); - else - connection = queueFactory.createQueueConnection(); - } - catch (ClassCastException cce) - { - // JMS queue proxy for JMS destination ''{0}'' has a connection factory type of ''{1}'' which is not QueueConnectionFactory. - MessageException me = new MessageException(); - me.setMessage(JMSConfigConstants.NON_QUEUE_FACTORY, new Object[] {destinationJndiName, connectionFactory.getClass().getName()}); - throw me; - } - - // Create queue session on the connection - QueueConnection queueConnection = (QueueConnection) connection; - session = queueConnection.createQueueSession(false, getAcknowledgeMode()); - - // Create sender on the queue session - QueueSession queueSession = (QueueSession)session; - sender = queueSession.createSender(queue); - producer = sender; - - // Start the connection - connection.start(); - } - - @Override - void sendTextMessage(String text, Map properties) throws JMSException - { - if (text == null) - return; - - TextMessage message = session.createTextMessage(); - message.setText(text); - copyHeadersToProperties(properties, message); - sender.send(message, getDeliveryMode(), messagePriority, getTimeToLive(properties)); - } - - @Override - void sendObjectMessage(Serializable obj, Map properties) throws JMSException - { - if (obj == null) - return; - - ObjectMessage message = session.createObjectMessage(); - message.setObject(obj); - copyHeadersToProperties(properties, message); - sender.send(message, getDeliveryMode(), messagePriority, getTimeToLive(properties)); - } - - @Override - void sendMapMessage(Map<String, ?> map, Map properties) throws JMSException - { - if (map == null) - return; - - MapMessage message = session.createMapMessage(); - for (Map.Entry<String, ?> entry : map.entrySet()) - message.setObject(entry.getKey(), entry.getValue()); - copyHeadersToProperties(properties, message); - sender.send(message, getDeliveryMode(), messagePriority, getTimeToLive(properties)); - } -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/adapters/JMSSettings.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/services/messaging/adapters/JMSSettings.java b/core/src/flex/messaging/services/messaging/adapters/JMSSettings.java deleted file mode 100644 index 0515884..0000000 --- a/core/src/flex/messaging/services/messaging/adapters/JMSSettings.java +++ /dev/null @@ -1,602 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.services.messaging.adapters; - -import java.util.Hashtable; - -import flex.messaging.config.ConfigurationException; - -/** - * Settings for <code>JMSAdapter</code>. - */ -public class JMSSettings -{ - private String acknowledgeMode; - private String connectionFactory; - private String connectionUsername; - private String connectionPassword; - private String deliveryMode; - private String destinationJNDIName; - private DeliverySettings deliverySettings; - private String destinationType; - private boolean durableConsumers; - private Hashtable initialContextEnvironment; - private int maxProducers; - private int messagePriority; - private String messageType; - private boolean preserveJMSHeaders; - - /** - * Creates a <code>JMSSettings</code> instance with the following default - * values: acknowledge mode of AUTO_ACKNOWLEDGE, delivery mode of - * DEFAULT_DELIVERY_MODE, destination type of Topic, and default - * delivery setting. - */ - public JMSSettings() - { - acknowledgeMode = JMSConfigConstants.AUTO_ACKNOWLEDGE; - deliveryMode = JMSConfigConstants.DEFAULT_DELIVERY_MODE; - destinationType = JMSConfigConstants.TOPIC; - deliverySettings = new DeliverySettings(); - maxProducers = JMSConfigConstants.defaultMaxProducers; - messagePriority = javax.jms.Message.DEFAULT_PRIORITY; - preserveJMSHeaders = JMSConfigConstants.defaultPreserveJMSHeaders; - } - - /** - * Returns the <code>acknowledge-mode</code> property. - * - * @return a String containing the <code>acknowledge-mode</code> - */ - public String getAcknowledgeMode() - { - return acknowledgeMode; - } - - /** - * Sets the <code>acknowledge-mode</code> property which is the message - * acknowledgement mode for the JMS adapter. None of these modes require any - * action on the part of the Flex messaging client. This property is optional - * and defautls to AUTO_ACKNOWLEDGE. - * - * @param mode Message acknowledgement mode. Supported modes are: - * AUTO_ACKNOWLEDGE - the JMS provider client runtime automatically acknowledges the messages. - * DUPS_OK_ACKNOWLEDGE - auto-acknowledgement of the messages is not required. - * CLIENT_ACKNOWLEDGE - the JMS adapter should acknowledge that the message was received. - * - */ - public void setAcknowledgeMode(String mode) - { - if (mode == null) - { - acknowledgeMode = JMSConfigConstants.defaultAcknowledgeMode; - return; - } - - mode = mode.toLowerCase(); - - if (!(mode.equals(JMSConfigConstants.AUTO_ACKNOWLEDGE) - || mode.equals(JMSConfigConstants.DUPS_OK_ACKNOWLEDGE) - || mode.equals(JMSConfigConstants.CLIENT_ACKNOWLEDGE)) ) - { - // Invalid Acknowledge Mode ''{0}''. Valid values are AUTO_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE, and CLIENT_ACKNOWLEDGE. - ConfigurationException ce = new ConfigurationException(); - ce.setMessage(JMSConfigConstants.INVALID_ACKNOWLEDGE_MODE, new Object[] {mode}); - throw ce; - } - acknowledgeMode = mode; - } - - /** - * Returns the <code>connection-factory</code> property. - * - * @return a String containing the <code>connection-factory</code>. - */ - public String getConnectionFactory() - { - return connectionFactory; - } - - /** - * Sets the <code>connection-factory</code> property which is the name of - * the JMS connection factory in JNDI. This property is required and it - * cannot be null. - * - * @param factory The non-null name of the JMS connection factory. - */ - public void setConnectionFactory(String factory) - { - if (factory == null) - { - // JMS connection factory of message destinations with JMS Adapters must be specified. - ConfigurationException ce = new ConfigurationException(); - ce.setMessage(JMSConfigConstants.MISSING_CONNECTION_FACTORY); - throw ce; - } - connectionFactory = factory; - } - - /** - * Returns the connection username used while creating JMS connections. - * - * @return The connection username used while creating JMS connections. - */ - public String getConnectionUsername() - { - return connectionUsername; - } - - /** - * Sets the connection username used while creating JMS connections. - * This is optional and only needed when connection level JMS authentication - * is being used. - * - * @param connectionUsername The connection username used while creating JMS connections. - */ - public void setConnectionUsername(String connectionUsername) - { - this.connectionUsername = connectionUsername; - } - - /** - * Returns the connection password used while creating JMS connections. - * - * @return The connection password used while creating JMS connections. - */ - public String getConnectionPassword() - { - return connectionPassword; - } - - /** - * Sets the connection password used while creating JMS connections. - * This is optional and only needed when connection level JMS authentication - * is being used. - * - * @param connectionPassword The connection password used while creating JMS connections. - */ - public void setConnectionPassword(String connectionPassword) - { - this.connectionPassword = connectionPassword; - } - - /** - * Returns the <code>delivery-mode</code> property. - * - * @return a String containing the <code>delivery-mode</code>. - */ - public String getDeliveryMode() - { - return deliveryMode; - } - - /** - * Sets the <code>delivery-mode</code> property which is the JMS DeliveryMode - * for producers. This property optional and defaults to DEFAULT_DELIVERY_MODE. - * - * @param mode The delivery mode. Valid values are DEFAULT_DELIVERY_MODE, - * PERSISTENT, and NON_PERSISTENT. - */ - public void setDeliveryMode(String mode) - { - if (mode == null) - { - deliveryMode = JMSConfigConstants.DEFAULT_DELIVERY_MODE; - return; - } - - mode = mode.toLowerCase(); - - if (!(mode.equals(JMSConfigConstants.DEFAULT_DELIVERY_MODE) - || mode.equals(JMSConfigConstants.PERSISTENT) - || mode.equals(JMSConfigConstants.NON_PERSISTENT))) - { - // Invalid Delivery Mode ''{0}''. Valid values are DEFAULT_DELIVERY_MODE, PERSISTENT, and NON_PERSISTENT. - ConfigurationException ce = new ConfigurationException(); - ce.setMessage(JMSConfigConstants.INVALID_DELIVERY_MODE, new Object[] {mode}); - throw ce; - } - deliveryMode = mode; - } - - /** - * Returns the <code>delivery-settings</code> property. - * - * @return The <code>delivery-settings</code> property. - */ - public DeliverySettings getDeliverySettings() - { - return deliverySettings; - } - - /** - * Sets the <code>delivery-settings</code> property. This property is - * optional and defaults to default settings as described in - * <code>DeliverySettings</code> inner class. - * - * @param deliverySettings The <code>delivery-settings</code> property. - */ - public void setDeliverySettings(DeliverySettings deliverySettings) - { - this.deliverySettings = deliverySettings; - } - - /** - * Returns the <code>destination-jndi-name</code> property. - * - * @return a String containing the <code>destination-jndi-name</code> - */ - public String getDestinationJNDIName() - { - return destinationJNDIName; - } - - /** - * Sets the <code>destination-jndi-name</code> property which is the name of - * the destination in JNDI. This value is required and it cannot be null. - * - * @param name The non-null name of the destination in JNDI. - */ - public void setDestinationJNDIName(String name) - { - if (name == null) - { - // JNDI names for message destinations with JMS Adapters must be specified. - ConfigurationException ce = new ConfigurationException(); - ce.setMessage(JMSConfigConstants.MISSING_DESTINATION_JNDI_NAME); - throw ce; - } - destinationJNDIName = name; - } - - /** - * Destination-name property is not used anymore. - * - * @deprecated - * @return null. - */ - public String getDestinationName() - { - return null; - } - - /** - * Destination-name property is not used anymore. - * - * @deprecated - * @param name The name of the destination. - */ - public void setDestinationName(String name) - { - // No-op - } - - /** - * Returns the <code>destination-type</code> property. - * - * @return a String containing the <code>destination-type</code>. - */ - public String getDestinationType() - { - return destinationType; - } - - /** - * Sets the <code>destination-type</code> property which determines whether - * the adapter is performing topic (pub/sub) or queue (point-to-point) - * messaging. This element is optional and defaults to Topic. - * - * @param type The destination type. Valid values are Topic and Queue. - */ - public void setDestinationType(String type) - { - if (type == null) - { - destinationType = JMSConfigConstants.defaultDestinationType; - return; - } - - type = type.toLowerCase(); - - if (!(type.equals(JMSConfigConstants.TOPIC) || type.equals(JMSConfigConstants.QUEUE))) - { - // JMS Adapter destination type must be Topic or Queue. - ConfigurationException ce = new ConfigurationException(); - ce.setMessage(JMSConfigConstants.INVALID_DESTINATION_TYPE); - throw ce; - } - destinationType = type; - } - - /** - * Returns whether consumers are durable or not. - * - * @return <code>true</code> is consumers are durable, <code>false</code> - * otherwise. - */ - public boolean useDurableConsumers() - { - return durableConsumers; - } - - /** - * Sets whethers consumers are durable or not. This property is optional - * and defaults to false. - * - * @param durable A boolean indicating whether consumers should be durable. - */ - public void setDurableConsumers(boolean durable) - { - durableConsumers = durable; - } - - /** - * Returns the <code>initial-context-environment</code> property. - * - * @return a Hashtable of the <code>initial-context-environment</code>. - */ - public Hashtable getInitialContextEnvironment() - { - return initialContextEnvironment; - } - - /** - * Sets the <code>initial-context-environment</code> property. This property - * is optional. - * - * @param env A Hashtable of the <code>initial-context-environment</code>. - */ - public void setInitialContextEnvironment(Hashtable env) - { - initialContextEnvironment = env; - } - - /** - * Returns the <code>max-producers</code> property. - * - * @return an int representing the <code>max-producers</code>. - */ - public int getMaxProducers() - { - return maxProducers; - } - - /** - * Sets the <code>max-producers</code> property which is the maximum number - * of producer proxies that this destination should use when communicating - * with the JMS Server. This property is optional and defaults to 1 which - * implies all clients using this destinatin will share the same connection - * to the JMS server. - * - * @param value an int representing the <code>max-producers</code>. - */ - public void setMaxProducers(int value) - { - if (value < 1) - value = JMSConfigConstants.defaultMaxProducers; - maxProducers = value; - } - - /** - * Returns the <code>message-priority</code> property. - * - * @return an int specifying the <code>message-priority</code> - */ - public int getMessagePriority() - { - return messagePriority; - } - - /** - * Sets the <code>message-priority</code> property which is the JMS priority - * for messages sent by Flex producers. This property is optional and - * defaults to <code>javax.jms.Message.DEFAULT_PRIORITY</code>. - * - * @param priority an int specifying the <code>message-priority</code>. - */ - public void setMessagePriority(int priority) - { - messagePriority = priority; - } - - /** - * Returns the <code>message-type</code> property. - * - * @return a String containing the <code>message-type</code>. - */ - public String getMessageType() - { - return messageType; - } - - /** - * Sets the <code>message-type</code> property which is the - * <code>javax.jms.Message</code> type which the adapter should use for the - * destination. - * - * @param type The <code>message-type</code> property. Supported types are - * <code>javax.jms.TextMessage</code>, <code>javax.jms.ObjectMessage</code>, - * and <code>javax.jms.MapMessage</code>. - */ - public void setMessageType(String type) - { - if (type == null || !(type.equals(JMSConfigConstants.TEXT_MESSAGE) - || type.equals(JMSConfigConstants.OBJECT_MESSAGE) - || type.equals(JMSConfigConstants.MAP_MESSAGE)) ) - { - // Unsupported JMS Message Type ''{0}''. Valid values are javax.jms.TextMessage and javax.jms.ObjectMessage. - ConfigurationException ce = new ConfigurationException(); - ce.setMessage(JMSConfigConstants.INVALID_JMS_MESSAGE_TYPE, new Object[] {type}); - throw ce; - } - messageType = type; - } - - /** - * Returns the <code>preserve-jms-headers</code> property. - * - * @return The <code>preserve-jms-headers</code> property. - */ - public boolean isPreserveJMSHeaders() - { - return preserveJMSHeaders; - } - - /** - * Sets the <code>preserve-jms-headers</code> property. This property is - * optional and defaults to true. - * - * @param preserveJMSHeaders The <code>preserve-jms-headers</code> property. - */ - public void setPreserveJMSHeaders(boolean preserveJMSHeaders) - { - this.preserveJMSHeaders = preserveJMSHeaders; - } - - /** - * Transacted-session property is not used anymore. - * - * @deprecated - * @return false. - */ - public boolean isTransactedSessions() - { - return false; - } - - /** - * Transacted-session property is not used anymore. - * - * @deprecated - * @param mode The transacted-session property. - */ - public void setTransactedSessions(boolean mode) - { - // No-op - } - - //-------------------------------------------------------------------------- - // - // Nested Classes - // - //-------------------------------------------------------------------------- - - /** - * A static inner class for delivery settings. - */ - public static class DeliverySettings - { - private String mode; - private long syncReceiveIntervalMillis; - private long syncReceiveWaitMillis; - - /** - * Creates a default <code>DeliverySettings</code> instance with default - * settings. - */ - public DeliverySettings() - { - mode = JMSConfigConstants.SYNC; - syncReceiveIntervalMillis = JMSConfigConstants.defaultSyncReceiveIntervalMillis; - syncReceiveWaitMillis = JMSConfigConstants.defaultSyncReceiveWaitMillis; - } - - /** - * Returns the message delivery mode. - * - * @return The message delivery mode. - */ - public String getMode() - { - return mode; - } - /** - * Sets the message delivery mode. This property is optional and defaults - * to sync. - * - * @param mode The message delivery mode. Valid values are async and sync. - */ - public void setMode(String mode) - { - if (mode == null) - { - mode = JMSConfigConstants.defaultMode; - return; - } - - mode = mode.toLowerCase(); - - if (!(mode.equals(JMSConfigConstants.ASYNC) || mode.equals(JMSConfigConstants.SYNC))) - { - // Invalid delivery-settings mode ''{0}''. Valid values are async and sync. - ConfigurationException ce = new ConfigurationException(); - ce.setMessage(JMSConfigConstants.INVALID_DELIVERY_MODE_VALUE, new Object[] {mode}); - throw ce; - } - this.mode = mode; - } - - /** - * Returns the interval of the sync receive message call. - * - * @return The interval of the sync receive message call. - */ - public long getSyncReceiveIntervalMillis() - { - return syncReceiveIntervalMillis; - } - - /** - * Sets the interval of the receive message call. This property - * is optional and defaults to 100. - * - * @param syncReceiveIntervalMillis A positive long that indicates - * the interval of the receive message call. - */ - public void setSyncReceiveIntervalMillis(long syncReceiveIntervalMillis) - { - if (syncReceiveIntervalMillis < 1) - syncReceiveIntervalMillis = JMSConfigConstants.defaultSyncReceiveIntervalMillis; - this.syncReceiveIntervalMillis = syncReceiveIntervalMillis; - } - - /** - * Returns how long a JMS proxy waits for a message before returning. - * - * @return How long a JMS proxy waits for a message before returning. - */ - public long getSyncReceiveWaitMillis() - { - return syncReceiveWaitMillis; - } - - /** - * Sets how long a JMS proxy waits for a message before returning. - * This property is optional and defaults to zero (no wait). - * - * @param syncReceiveWaitMillis A non-negative value that indicates how - * long a JMS proxy waits for a message before returning. Zero means no - * wait, negative one means wait until a message arrives. - */ - public void setSyncReceiveWaitMillis(long syncReceiveWaitMillis) - { - if (syncReceiveWaitMillis < -1) - syncReceiveWaitMillis = JMSConfigConstants.defaultSyncReceiveWaitMillis; - this.syncReceiveWaitMillis = syncReceiveWaitMillis; - } - } -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/adapters/JMSTopicConsumer.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/services/messaging/adapters/JMSTopicConsumer.java b/core/src/flex/messaging/services/messaging/adapters/JMSTopicConsumer.java deleted file mode 100644 index 4a4498c..0000000 --- a/core/src/flex/messaging/services/messaging/adapters/JMSTopicConsumer.java +++ /dev/null @@ -1,313 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.services.messaging.adapters; - -import javax.jms.JMSException; -import javax.jms.Topic; -import javax.jms.TopicConnection; -import javax.jms.TopicConnectionFactory; -import javax.jms.TopicSession; -import javax.naming.NamingException; - -import flex.messaging.MessageException; -import flex.messaging.config.ConfigurationException; -import flex.messaging.log.Log; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Callable; - -/** - * A <code>JMSConsumer</code> subclass specifically for JMS Topic subscribers. - * - * - */ -public class JMSTopicConsumer extends JMSConsumer -{ - protected boolean durableConsumers; - protected String durableSubscriptionName; - - //-------------------------------------------------------------------------- - // - // Initialize, validate, start, and stop methods. - // - //-------------------------------------------------------------------------- - - /** - * Initialize with settings from the JMS adapter. - * - * @param settings JMS settings to use for initialization. - */ - public void initialize(JMSSettings settings) - { - super.initialize(settings); - durableConsumers = settings.useDurableConsumers(); - } - - /** - * Verifies that the <code>JMSTopicConsumer</code> is in valid state before - * it is started. For <code>JMSTopicConsumer</code> to be in valid state, - * it needs durableSubscriptionName if durableConsumers is true. - */ - protected void validate() - { - super.validate(); - - if (durableConsumers && durableSubscriptionName == null) - { - // JMS topic consumer for JMS destination ''{0}'' is configured to use durable subscriptions but it does not have a durable subscription name. - ConfigurationException ce = new ConfigurationException(); - ce.setMessage(JMSConfigConstants.MISSING_DURABLE_SUBSCRIPTION_NAME, new Object[]{destinationJndiName}); - throw ce; - } - } - - /** - * Starts the <code>JMSTopicConsumer</code>. - * - * @throws NamingException - * @throws JMSException - */ - public void start() throws NamingException, JMSException - { - super.start(); - - // Establish topic - Topic topic; - try - { - topic = (Topic)destination; - } - catch (ClassCastException cce) - { - // JMS topic proxy for JMS destination ''{0}'' has a destination type of ''{1}'' which is not Topic. - MessageException me = new MessageException(); - me.setMessage(JMSConfigConstants.NON_TOPIC_DESTINATION, new Object[] {destinationJndiName, destination.getClass().getName()}); - throw me; - } - - // Create connection - TopicConnectionFactory topicFactory; - try - { - topicFactory = (TopicConnectionFactory)connectionFactory; - if (connectionCredentials != null) - connection = topicFactory.createTopicConnection(connectionCredentials.getUsername(), connectionCredentials.getPassword()); - else - connection = topicFactory.createTopicConnection(); - } - catch (ClassCastException cce) - { - // JMS topic proxy for JMS destination ''{0}'' has a connection factory type of ''{1}'' which is not TopicConnectionFactory. - MessageException me = new MessageException(); - me.setMessage(JMSConfigConstants.NON_TOPIC_FACTORY, new Object[] {destinationJndiName, connectionFactory.getClass().getName()}); - throw me; - } - - TopicConnection topicConnection = (TopicConnection)connection; - - if (durableConsumers) - { - try - { - if (Log.isDebug()) - Log.getLogger(JMSAdapter.LOG_CATEGORY).debug("JMS consumer for JMS destination '" - + destinationJndiName + "' is setting its underlying connection's client id to " - + durableSubscriptionName + " for durable subscription."); - - topicConnection.setClientID(durableSubscriptionName); - } - catch (Exception e) - { - // Try to set the clientID in a seperate thread. - ExecutorService clientIdSetter = Executors.newSingleThreadExecutor(); - ClientIdSetterCallable cisc = new ClientIdSetterCallable(topicFactory, durableSubscriptionName); - Future future = clientIdSetter.submit(cisc); - try - { - topicConnection = (TopicConnection)future.get(); - } - catch (InterruptedException ie) - { - if (Log.isWarn()) - Log.getLogger(JMSAdapter.LOG_CATEGORY).warn("The proxied durable JMS subscription with name, " - + durableSubscriptionName + " could not set its client id " - + "on the topic connection because it was interrupted: " - + ie.toString()); - } - catch (ExecutionException ee) - { - // JMS topic consumer for JMS destination ''{0}'' is configured to use durable subscriptions but the application server does not permit javax.jms.Connection.setClientID method needed to support durable subscribers. Set durable property to false. - MessageException me = new MessageException(); - me.setMessage(JMSConfigConstants.DURABLE_SUBSCRIBER_NOT_SUPPORTED, new Object[]{destinationJndiName}); - throw me; - } - } - } - - // Create topic session on the connection - session = topicConnection.createTopicSession(false, getAcknowledgeMode()); - TopicSession topicSession = (TopicSession) session; - - // Create subscriber on topic session, handling message selectors and durable subscribers - if (selectorExpression != null) - { - if (durableConsumers && durableSubscriptionName != null) - consumer = topicSession.createDurableSubscriber(topic, durableSubscriptionName, selectorExpression, false); - else - consumer = topicSession.createSubscriber(topic, selectorExpression, false); - } - else - { - if (durableConsumers && durableSubscriptionName != null) - consumer = topicSession.createDurableSubscriber(topic, durableSubscriptionName); - else - consumer = topicSession.createSubscriber(topic); - } - - startMessageReceiver(); - } - - /** - * Stops the <code>JMSTopicConsumer</code> and unsubscribes a durable subscription - * if one exists. - * - * @param unsubscribe Determines whether to unsubscribe a durable subscription - * if one exists, or not. - */ - public void stop(boolean unsubscribe) - { - if (unsubscribe) - { - stopMessageReceiver(); - - try - { - if (consumer != null) - consumer.close(); - } - catch (Exception e) - { - if (Log.isWarn()) - Log.getLogger(JMSAdapter.LOG_CATEGORY).warn("JMS consumer for JMS destination '" - + destinationJndiName + "' received an error while closing its underlying MessageConsumer: " - + e.getMessage()); - } - - if (durableConsumers) - { - try - { - TopicSession topicSession = (TopicSession)session; - topicSession.unsubscribe(durableSubscriptionName); - } - catch (Exception e) - { - if (Log.isWarn()) - Log.getLogger(JMSAdapter.LOG_CATEGORY).warn("The proxied durable JMS subscription with name, " - + durableSubscriptionName + " failed to unsubscribe : " - + e.toString()); - } - } - } - super.stop(); - } - - //-------------------------------------------------------------------------- - // - // Public Getters and Setters for JMSConsumer properties - // - //-------------------------------------------------------------------------- - - /** - * Returns whether consumers are durable or not. - * - * @return <code>true</code> is consumers are durable, <code>false</code> - * otherwise. - */ - public boolean isDurableConsumers() - { - return durableConsumers; - } - - /** - * Sets whether consumers are durable or not. This property is optional - * and defaults to false and it should not changed after startup. - * - * @param durableConsumers A boolean indicating whether consumers should be durable. - */ - public void setDurableConsumers(boolean durableConsumers) - { - this.durableConsumers = durableConsumers; - } - - /** - * Returns the durable subscription name. - * - * @return The durable subscription name. - */ - public String getDurableSubscriptionName() - { - return durableSubscriptionName; - } - - /** - * Sets the durable subscription name that is used as clientID on the - * underlying connection when durable subscriptions are used. This property - * should not changed after startup. - * - * @param durableSubscriptionName The durable subscription name. - */ - public void setDurableSubscriptionName(String durableSubscriptionName) - { - this.durableSubscriptionName = durableSubscriptionName; - } - - //-------------------------------------------------------------------------- - // - // Protected and Private APIs - // - //-------------------------------------------------------------------------- - - /** - * Helper thread class to circumvent a Web/EJB container from preventing us - * from calling setClientId() on our topic connection. By spinning this out - * into a short lived thread the container loses track of what we're doing. - */ - class ClientIdSetterCallable implements Callable - { - private TopicConnectionFactory tcf; - private String clientId; - - private TopicConnection topicConnection; - - public ClientIdSetterCallable(TopicConnectionFactory tcf, String clientId) - { - this.tcf = tcf; - this.clientId = clientId; - } - - public Object call() throws JMSException - { - topicConnection = tcf.createTopicConnection(); - topicConnection.setClientID(clientId); - return topicConnection; - } - } -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/adapters/JMSTopicProducer.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/services/messaging/adapters/JMSTopicProducer.java b/core/src/flex/messaging/services/messaging/adapters/JMSTopicProducer.java deleted file mode 100644 index 3e4941e..0000000 --- a/core/src/flex/messaging/services/messaging/adapters/JMSTopicProducer.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.services.messaging.adapters; - -import flex.messaging.MessageException; - -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.ObjectMessage; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicConnection; -import javax.jms.TopicConnectionFactory; -import javax.jms.TopicPublisher; -import javax.jms.TopicSession; -import javax.naming.NamingException; -import java.io.Serializable; -import java.util.Map; - -/** - * A <code>JMSProducer</code> subclass specifically for JMS Topic publishers. - * - * - */ -public class JMSTopicProducer extends JMSProducer -{ - /* JMS related variables */ - private TopicPublisher publisher; - - /** - * Starts <code>JMSTopicProducer</code>. - */ - public void start() throws NamingException, JMSException - { - super.start(); - - // Establish topic - Topic topic = null; - try - { - topic = (Topic)destination; - } - catch (ClassCastException cce) - { - // JMS topic proxy for JMS destination ''{0}'' has a destination type of ''{1}'' which is not Topic. - MessageException me = new MessageException(); - me.setMessage(JMSConfigConstants.NON_TOPIC_DESTINATION, new Object[] {destinationJndiName, destination.getClass().getName()}); - throw me; - } - - // Create connection - try - { - TopicConnectionFactory topicFactory = (TopicConnectionFactory)connectionFactory; - if (connectionCredentials != null) - connection = topicFactory.createTopicConnection(connectionCredentials.getUsername(), connectionCredentials.getPassword()); - else - connection = topicFactory.createTopicConnection(); - } - catch (ClassCastException cce) - { - // JMS topic proxy for JMS destination ''{0}'' has a connection factory of type ''{1}'' which is not TopicConnectionFactory. - MessageException me = new MessageException(); - me.setMessage(JMSConfigConstants.NON_TOPIC_FACTORY, new Object[] {destinationJndiName, connectionFactory.getClass().getName()}); - throw me; - } - - // Create topic session on the connection - TopicConnection topicConnection = (TopicConnection)connection; - session = topicConnection.createTopicSession(false /* Always nontransacted */, getAcknowledgeMode()); - - // Create publisher on the topic session - TopicSession topicSession = (TopicSession)session; - publisher = topicSession.createPublisher(topic); - producer = publisher; - - // Start the connection - connection.start(); - } - - void sendObjectMessage(Serializable obj, Map properties) throws JMSException - { - if (obj == null) - return; - - ObjectMessage message = session.createObjectMessage(); - message.setObject(obj); - copyHeadersToProperties(properties, message); - publisher.publish(message, getDeliveryMode(), messagePriority, getTimeToLive(properties)); - } - - void sendTextMessage(String text, Map properties) throws JMSException - { - if (text == null) - return; - - TextMessage message = session.createTextMessage(); - message.setText(text); - copyHeadersToProperties(properties, message); - publisher.publish(message, getDeliveryMode(), messagePriority, getTimeToLive(properties)); - } - - @Override - void sendMapMessage(Map<String, ?> map, Map properties) throws JMSException - { - if (map == null) - return; - - MapMessage message = session.createMapMessage(); - for (Map.Entry<String, ?> entry : map.entrySet()) - message.setObject(entry.getKey(), entry.getValue()); - copyHeadersToProperties(properties, message); - publisher.publish(message, getDeliveryMode(), messagePriority, getTimeToLive(properties)); - } -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/adapters/MessageReceiver.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/services/messaging/adapters/MessageReceiver.java b/core/src/flex/messaging/services/messaging/adapters/MessageReceiver.java deleted file mode 100644 index 6f4cde9..0000000 --- a/core/src/flex/messaging/services/messaging/adapters/MessageReceiver.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.services.messaging.adapters; - -import javax.jms.JMSException; - -/** - * An interface used by <code>JMSConsumer</code> to receive messages from JMS. - * - * - */ -interface MessageReceiver -{ - /** - * Called by <code>JMSConsumer</code> as it starts up. - * - * @throws JMSException - */ - void startReceive() throws JMSException; - - /** - * Called by <code>JMSConsumer</code> as it stops. - */ - void stopReceive(); -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/adapters/MessagingAdapter.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/services/messaging/adapters/MessagingAdapter.java b/core/src/flex/messaging/services/messaging/adapters/MessagingAdapter.java deleted file mode 100644 index df3a1f8..0000000 --- a/core/src/flex/messaging/services/messaging/adapters/MessagingAdapter.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.services.messaging.adapters; - -import flex.messaging.Destination; -import flex.messaging.config.ConfigMap; -import flex.messaging.config.ConfigurationConstants; -import flex.messaging.config.DestinationSettings; -import flex.messaging.messages.CommandMessage; -import flex.messaging.security.MessagingSecurity; -import flex.messaging.services.ServiceAdapter; -import flex.messaging.services.messaging.Subtopic; - -/** - * Base adapter class for publish/subscribe messaging adapters. You extend this - * class if you want to implement your own messaging adapter. A custom messaging - * adapter has the ability to implement authorization logic for specific subtopics, - * and may also control how messages are routed. A more advanced messaging adapter - * can take control over the subscription process and have complete control over - * the subscription process for producers and consumers. - * <p> - * All messaging adapters must provide an implementation for the invoke method. - * A simple invoke implementation which would give you similar functionality as - * the ActionScriptAdapter is simply: - * <pre> - * public Object invoke(Message message) - * { - * MessageService msgService = (MessageService)service; - * msgService.pushMessageToClients(message, true); - * msgService.sendPushMessageFromPeer(message, true); - * return null; - * } - * </pre> - * </p> - * <p> - * This method is called for each data message sent from the client. It gets - * a reference to the MessageService which is controlling delivery of messages - * for this adapter. It uses the pushMessageToClients method to send the - * message to all clients connected to this server. It then uses the - * sendPushMessageFromPeer method to send the message to other servers which - * will then route the message to each client connected to those servers. - * In both cases, we pass the "evalSelector" parameter as true. This indicates - * that the message service will only send the message to those clients whose - * selector pattern evaluates to true for the supplied message. If you supply - * false, the selector pattern is ignored and the message is delivered to the clients - * even if the pattern evaluates to false. - * </p><p> - * The default behavior is for the messaging adapter to use the builtin Data Services - * subscription mechanism. The client sends subscribe and unsubscribe command - * messages which are managed by the MessageService, not the adapter. If you - * override the "handlesSubscriptions" method to return true, your adapter's - * manage method is called for each of these command messages instead. You must - * then override this method to provide an implementation for these operations. - * See the docs on the CommandMessage class for details on the message format. - * </p> - * - * @see flex.messaging.services.ServiceAdapter - * @see flex.messaging.services.MessageService - * @see flex.messaging.messages.Message - * @see flex.messaging.messages.CommandMessage - */ -public abstract class MessagingAdapter extends ServiceAdapter implements MessagingSecurity -{ - /** - * Constraint manager used to assert authorization of send and subscribe related operations. - */ - private MessagingSecurityConstraintManager constraintManager; - - //-------------------------------------------------------------------------- - // - // Constructor - // - //-------------------------------------------------------------------------- - - /** - * Constructs an unmanaged <code>MessagingAdapter</code> instance. - */ - public MessagingAdapter() - { - this(false); - } - - /** - * Constructs a <code>MessagingAdapter</code> instance. - * - * @param enableManagement <code>true</code> if the <code>MessagingAdapter</code> - * has a corresponding MBean control for management; otherwise <code>false</code>. - */ - public MessagingAdapter(boolean enableManagement) - { - super(enableManagement); - } - - //-------------------------------------------------------------------------- - // - // Initialize, validate, start, and stop methods. - // - //-------------------------------------------------------------------------- - - /** - * Initializes the <code>MessagingAdapter</code> with the properties. - * Subclasses should call <code>super.initialize</code>. - * - * @param id Id of the <code>MessagingAdapter</code>. - * @param properties Properties for the <code>MessagingAdapter</code>. - */ - public void initialize(String id, ConfigMap properties) - { - super.initialize(id, properties); - - if (properties == null || properties.size() == 0) - return; - - ConfigMap serverSettings = properties.getPropertyAsMap(DestinationSettings.SERVER_ELEMENT, null); - if (serverSettings != null) - { - // Send constraint - ConfigMap send = serverSettings.getPropertyAsMap(MessagingSecurityConstraintManager.SEND_SECURITY_CONSTRAINT, null); - if (send != null) - { - String ref = send.getPropertyAsString(ConfigurationConstants.REF_ATTR, null); - if (ref != null) - { - if (constraintManager == null) - constraintManager = new MessagingSecurityConstraintManager(getDestination().getService().getMessageBroker()); - constraintManager.createSendConstraint(ref); - } - } - - // Subscribe constraint - ConfigMap subscribe = serverSettings.getPropertyAsMap(MessagingSecurityConstraintManager.SUBSCRIBE_SECURITY_CONSTRAINT, null); - if (subscribe != null) - { - String ref = subscribe.getPropertyAsString(ConfigurationConstants.REF_ATTR, null); - if (ref != null) - { - if (constraintManager == null) - constraintManager = new MessagingSecurityConstraintManager(getDestination().getService().getMessageBroker()); - constraintManager.createSubscribeConstraint(ref); - } - } - } - } - - /** - * Verifies that the <code>MessagingAdapter</code> is in valid state before - * it is started. If subclasses override, they must call <code>super.validate()</code>. - */ - protected void validate() - { - if (isValid()) - return; - - super.validate(); - } - - //-------------------------------------------------------------------------- - // - // Public Methods - // - //-------------------------------------------------------------------------- - - /** - * Implements flex.messaging.security.MessagingSecurity. - * This method is invoked before a client subscribe request is processed, - * so that custom application logic can determine whether the client - * should be allowed to subscribe to the specified subtopic. You can access - * the current user via - * <code>FlexContext.getUserPrincipal()</code>. - * - * @param subtopic The subtopic the client is attempting to subscribe to. - * @return true to allow the subscription, false to prevent it. - */ - public boolean allowSubscribe(Subtopic subtopic) - { - return true; - } - - /** - * Implements flex.messaging.security.MessagingSecurity. - * This method is invoked before a client message is sent to a subtopic, - * so that custom application logic can determine whether the client - * should be allowed to send to the specified subtopic. You can access - * the current user via - * <code>FlexContext.getUserPrincipal()</code>. - * - * @param subtopic The subtopic the client is attempting to send a message to. - * @return true to allow the message to be sent, false to prevent it. - */ - public boolean allowSend(Subtopic subtopic) - { - return true; - } - - /** - * Gets the <code>MessagingSecurityConstraintManager</code> of the <code>MessagingAdapter</code>. - * - * @return The <code>MessagingSecurityConstraintManager</code> of the <code>MessagingAdapter</code>. - */ - public MessagingSecurityConstraintManager getSecurityConstraintManager() - { - return constraintManager; - } - - /** - * Sets the <code>MessagingSecurityConstraintManager</code> of the <code>MessagingAdapter</code>. - * - * @param constraintManager The <code>MessagingSecurityConstraintManager</code> of the <code>MessagingAdapter</code>. - */ - public void setSecurityConstraintManager(MessagingSecurityConstraintManager constraintManager) - { - this.constraintManager = constraintManager; - } - -}
