http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java new file mode 100644 index 0000000..8bee2e3 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java @@ -0,0 +1,643 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.message; + +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageFormatException; +import javax.jms.MessageNotReadableException; +import javax.jms.MessageNotWriteableException; + +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.exceptions.JmsExceptionSupport; +import org.apache.qpid.jms.message.facade.JmsMessageFacade; +import org.apache.qpid.jms.meta.JmsMessageId; +import org.apache.qpid.jms.util.TypeConversionSupport; + +public class JmsMessage implements javax.jms.Message { + + protected transient Callable<Void> acknowledgeCallback; + protected transient JmsConnection connection; + + protected final JmsMessageFacade facade; + protected boolean readOnlyBody; + protected boolean readOnlyProperties; + + public JmsMessage(JmsMessageFacade facade) { + this.facade = facade; + } + + public JmsMessage copy() throws JMSException { + JmsMessage other = new JmsMessage(facade.copy()); + other.copy(this); + return other; + } + + protected void copy(JmsMessage other) { + this.readOnlyBody = other.readOnlyBody; + this.readOnlyProperties = other.readOnlyBody; + this.acknowledgeCallback = other.acknowledgeCallback; + this.connection = other.connection; + } + + @Override + public int hashCode() { + String id = null; + try { + id = getJMSMessageID(); + } catch (JMSException e) { + } + + if (id != null) { + return id.hashCode(); + } else { + return super.hashCode(); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || o.getClass() != getClass()) { + return false; + } + + JmsMessage msg = (JmsMessage) o; + JmsMessageId oMsg = null; + JmsMessageId thisMsg = null; + + thisMsg = facade.getMessageId(); + oMsg = msg.facade.getMessageId(); + + return thisMsg != null && oMsg != null && oMsg.equals(thisMsg); + } + + @Override + public void acknowledge() throws JMSException { + if (acknowledgeCallback != null) { + try { + acknowledgeCallback.call(); + } catch (Throwable e) { + throw JmsExceptionSupport.create(e); + } + } + } + + @Override + public void clearBody() throws JMSException { + readOnlyBody = false; + facade.clearBody(); + } + + public boolean isReadOnlyBody() { + return this.readOnlyBody; + } + + public void setReadOnlyBody(boolean readOnlyBody) { + this.readOnlyBody = readOnlyBody; + } + + public void setReadOnlyProperties(boolean readOnlyProperties) { + this.readOnlyProperties = readOnlyProperties; + } + + @Override + public String getJMSMessageID() throws JMSException { + if (facade.getMessageId() == null) { + return null; + } + return facade.getMessageId().toString(); + } + + @Override + public void setJMSMessageID(String value) throws JMSException { + if (value != null) { + JmsMessageId id = new JmsMessageId(value); + facade.setMessageId(id); + } else { + facade.setMessageId(null); + } + } + + public void setJMSMessageID(JmsMessageId messageId) throws JMSException { + facade.setMessageId(messageId); + } + + @Override + public long getJMSTimestamp() throws JMSException { + return facade.getTimestamp(); + } + + @Override + public void setJMSTimestamp(long timestamp) throws JMSException { + facade.setTimestamp(timestamp); + } + + @Override + public String getJMSCorrelationID() throws JMSException { + return facade.getCorrelationId(); + } + + @Override + public void setJMSCorrelationID(String correlationId) throws JMSException { + facade.setCorrelationId(correlationId); + } + + @Override + public byte[] getJMSCorrelationIDAsBytes() throws JMSException { + return facade.getCorrelationIdBytes(); + } + + @Override + public void setJMSCorrelationIDAsBytes(byte[] correlationId) throws JMSException { + facade.setCorrelationIdBytes(correlationId); + } + + @Override + public Destination getJMSReplyTo() throws JMSException { + return facade.getReplyTo(); + } + + @Override + public void setJMSReplyTo(Destination destination) throws JMSException { + facade.setReplyTo(JmsMessageTransformation.transformDestination(connection, destination)); + } + + @Override + public Destination getJMSDestination() throws JMSException { + return facade.getDestination(); + } + + @Override + public void setJMSDestination(Destination destination) throws JMSException { + facade.setDestination(JmsMessageTransformation.transformDestination(connection, destination)); + } + + @Override + public int getJMSDeliveryMode() throws JMSException { + return facade.isPersistent() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT; + } + + @Override + public void setJMSDeliveryMode(int mode) throws JMSException { + facade.setPersistent(mode == DeliveryMode.PERSISTENT); + } + + @Override + public boolean getJMSRedelivered() throws JMSException { + return this.isRedelivered(); + } + + @Override + public void setJMSRedelivered(boolean redelivered) throws JMSException { + this.setRedelivered(redelivered); + } + + @Override + public String getJMSType() throws JMSException { + return facade.getType(); + } + + @Override + public void setJMSType(String type) throws JMSException { + facade.setType(type); + } + + @Override + public long getJMSExpiration() throws JMSException { + return facade.getExpiration(); + } + + @Override + public void setJMSExpiration(long expiration) throws JMSException { + facade.setExpiration(expiration); + } + + @Override + public int getJMSPriority() throws JMSException { + return facade.getPriority(); + } + + @Override + public void setJMSPriority(int priority) throws JMSException { + byte scaled = 0; + + if (priority < 0) { + scaled = 0; + } else if (priority > 9) { + scaled = 9; + } else { + scaled = (byte) priority; + } + + facade.setPriority(scaled); + } + + @Override + public void clearProperties() throws JMSException { + facade.clearProperties(); + } + + @Override + public boolean propertyExists(String name) throws JMSException { + return JmsMessagePropertyIntercepter.propertyExists(facade, name); + } + + /** + * Returns an unmodifiable Map containing the properties contained within the message. + * + * @return unmodifiable Map of the current properties in the message. + * + * @throws JMSException if there is an error accessing the message properties. + */ + public Map<String, Object> getProperties() throws JMSException { + return Collections.unmodifiableMap(facade.getProperties()); + } + + /** + * Allows for a direct put of an Object value into the message properties. + * + * This method bypasses the normal JMS type checking for properties being set on + * the message and should be used with great care. + * + * @param key + * the property name to use when setting the value. + * @param value + * the value to insert into the message properties. + * + * @throws JMSException if an error occurs while accessing the Message properties. + */ + public void setProperty(String key, Object value) throws JMSException { + this.facade.setProperty(key, value); + } + + /** + * Returns the Object value referenced by the given key. + * + * @param key + * the name of the property being accessed. + * + * @return the value stored at the given location or null if non set. + * + * @throws JMSException if an error occurs while accessing the Message properties. + */ + public Object getProperty(String key) throws JMSException { + return this.facade.getProperty(key); + } + + @Override + public Enumeration<?> getPropertyNames() throws JMSException { + Set<String> result = new HashSet<String>(facade.getProperties().keySet()); + return Collections.enumeration(result); + } + + /** + * return all property names, including standard JMS properties and JMSX + * properties + * + * @return Enumeration of all property names on this message + * @throws JMSException + */ + public Enumeration<?> getAllPropertyNames() throws JMSException { + Set<String> result = new HashSet<String>(facade.getProperties().keySet()); + result.addAll(JmsMessagePropertyIntercepter.getAllPropertyNames()); + return Collections.enumeration(result); + } + + @Override + public void setObjectProperty(String name, Object value) throws JMSException { + checkReadOnlyProperties(); + checkPropertyNameIsValid(name); + checkValidObject(value); + JmsMessagePropertyIntercepter.setProperty(facade, name, value); + } + + public void setProperties(Map<String, Object> properties) throws JMSException { + for (Iterator<Map.Entry<String, Object>> iter = properties.entrySet().iterator(); iter.hasNext();) { + Map.Entry<String, Object> entry = iter.next(); + setObjectProperty(entry.getKey(), entry.getValue()); + } + } + + protected void checkValidObject(Object value) throws MessageFormatException { + boolean valid = value instanceof Boolean || + value instanceof Byte || + value instanceof Short || + value instanceof Integer || + value instanceof Long || + value instanceof Float || + value instanceof Double || + value instanceof Character || + value instanceof String || + value == null; + + if (!valid) { + throw new MessageFormatException("Only objectified primitive objects and String types are allowed but was: " + value + " type: " + value.getClass()); + } + } + + @Override + public Object getObjectProperty(String name) throws JMSException { + if (name == null) { + throw new NullPointerException("Property name cannot be null"); + } + + return JmsMessagePropertyIntercepter.getProperty(facade, name); + } + + @Override + public boolean getBooleanProperty(String name) throws JMSException { + Object value = getObjectProperty(name); + if (value == null) { + return false; + } + Boolean rc = (Boolean) TypeConversionSupport.convert(value, Boolean.class); + if (rc == null) { + throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a boolean"); + } + return rc.booleanValue(); + } + + @Override + public byte getByteProperty(String name) throws JMSException { + Object value = getObjectProperty(name); + if (value == null) { + throw new NumberFormatException("property " + name + " was null"); + } + Byte rc = (Byte) TypeConversionSupport.convert(value, Byte.class); + if (rc == null) { + throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a byte"); + } + return rc.byteValue(); + } + + @Override + public short getShortProperty(String name) throws JMSException { + Object value = getObjectProperty(name); + if (value == null) { + throw new NumberFormatException("property " + name + " was null"); + } + Short rc = (Short) TypeConversionSupport.convert(value, Short.class); + if (rc == null) { + throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a short"); + } + return rc.shortValue(); + } + + @Override + public int getIntProperty(String name) throws JMSException { + Object value = getObjectProperty(name); + if (value == null) { + throw new NumberFormatException("property " + name + " was null"); + } + Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class); + if (rc == null) { + throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as an integer"); + } + return rc.intValue(); + } + + @Override + public long getLongProperty(String name) throws JMSException { + Object value = getObjectProperty(name); + if (value == null) { + throw new NumberFormatException("property " + name + " was null"); + } + Long rc = (Long) TypeConversionSupport.convert(value, Long.class); + if (rc == null) { + throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a long"); + } + return rc.longValue(); + } + + @Override + public float getFloatProperty(String name) throws JMSException { + Object value = getObjectProperty(name); + if (value == null) { + throw new NullPointerException("property " + name + " was null"); + } + Float rc = (Float) TypeConversionSupport.convert(value, Float.class); + if (rc == null) { + throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a float"); + } + return rc.floatValue(); + } + + @Override + public double getDoubleProperty(String name) throws JMSException { + Object value = getObjectProperty(name); + if (value == null) { + throw new NullPointerException("property " + name + " was null"); + } + Double rc = (Double) TypeConversionSupport.convert(value, Double.class); + if (rc == null) { + throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a double"); + } + return rc.doubleValue(); + } + + @Override + public String getStringProperty(String name) throws JMSException { + Object value = getObjectProperty(name); + if (value == null) { + return null; + } + String rc = (String) TypeConversionSupport.convert(value, String.class); + if (rc == null) { + throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a String"); + } + return rc; + } + + @Override + public void setBooleanProperty(String name, boolean value) throws JMSException { + setObjectProperty(name, Boolean.valueOf(value)); + } + + @Override + public void setByteProperty(String name, byte value) throws JMSException { + setObjectProperty(name, Byte.valueOf(value)); + } + + @Override + public void setShortProperty(String name, short value) throws JMSException { + setObjectProperty(name, Short.valueOf(value)); + } + + @Override + public void setIntProperty(String name, int value) throws JMSException { + setObjectProperty(name, Integer.valueOf(value)); + } + + @Override + public void setLongProperty(String name, long value) throws JMSException { + setObjectProperty(name, Long.valueOf(value)); + } + + @Override + public void setFloatProperty(String name, float value) throws JMSException { + setObjectProperty(name, new Float(value)); + } + + @Override + public void setDoubleProperty(String name, double value) throws JMSException { + setObjectProperty(name, new Double(value)); + } + + @Override + public void setStringProperty(String name, String value) throws JMSException { + setObjectProperty(name, value); + } + + public Callable<Void> getAcknowledgeCallback() { + return acknowledgeCallback; + } + + public void setAcknowledgeCallback(Callable<Void> acknowledgeCallback) { + this.acknowledgeCallback = acknowledgeCallback; + } + + /** + * Send operation event listener. Used to get the message ready to be sent. + * + * @throws JMSException + */ + public void onSend() throws JMSException { + setReadOnlyBody(true); + setReadOnlyProperties(true); + facade.onSend(); + } + + public JmsConnection getConnection() { + return connection; + } + + public void setConnection(JmsConnection connection) { + this.connection = connection; + } + + public boolean isExpired() throws JMSException { + long expireTime = facade.getExpiration(); + return expireTime > 0 && System.currentTimeMillis() > expireTime; + } + + public void incrementRedeliveryCount() { + facade.setRedeliveryCounter(facade.getRedeliveryCounter() + 1); + } + + public JmsMessageFacade getFacade() { + return this.facade; + } + + public boolean isRedelivered() throws JMSException { + return facade.isRedelivered(); + } + + public void setRedelivered(boolean redelivered) throws JMSException { + if (redelivered) { + if (!isRedelivered()) { + facade.setRedeliveryCounter(1); + } + } else { + if (isRedelivered()) { + facade.setRedeliveryCounter(0); + } + } + } + + protected void checkReadOnlyProperties() throws MessageNotWriteableException { + if (readOnlyProperties) { + throw new MessageNotWriteableException("Message properties are read-only"); + } + } + + protected void checkReadOnlyBody() throws MessageNotWriteableException { + if (readOnlyBody) { + throw new MessageNotWriteableException("Message body is read-only"); + } + } + + protected void checkWriteOnlyBody() throws MessageNotReadableException { + if (!readOnlyBody) { + throw new MessageNotReadableException("Message body is write-only"); + } + } + + private void checkPropertyNameIsValid(String propertyName) throws IllegalArgumentException { + if (propertyName == null) { + throw new IllegalArgumentException("Property name must not be null"); + } else if (propertyName.length() == 0) { + throw new IllegalArgumentException("Property name must not be the empty string"); + } + + checkIdentifierFormat(propertyName); + } + + private void checkIdentifierFormat(String identifier) throws IllegalArgumentException { + checkIdentifierLetterAndDigitRequirements(identifier); + checkIdentifierIsntNullTrueFalse(identifier); + checkIdentifierIsntLogicOperator(identifier); + } + + private void checkIdentifierIsntLogicOperator(String identifier) { + // Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or ESCAPE. + if ("NOT".equals(identifier) || "AND".equals(identifier) || "OR".equals(identifier) || + "BETWEEN".equals(identifier) || "LIKE".equals(identifier) || "IN".equals(identifier) || + "IS".equals(identifier) || "ESCAPE".equals(identifier)) { + + throw new IllegalArgumentException("Identifier not allowed in JMS: '" + identifier + "'"); + } + } + + private void checkIdentifierIsntNullTrueFalse(String identifier) { + // Identifiers cannot be the names NULL, TRUE, and FALSE. + if ("NULL".equals(identifier) || "TRUE".equals(identifier) || "FALSE".equals(identifier)) { + throw new IllegalArgumentException("Identifier not allowed in JMS: '" + identifier + "'"); + } + } + + private void checkIdentifierLetterAndDigitRequirements(String identifier) { + // An identifier is an unlimited-length sequence of letters and digits, the first of + // which must be a letter. A letter is any character for which the method + // Character.isJavaLetter returns true. This includes '_' and '$'. A letter or digit + // is any character for which the method Character.isJavaLetterOrDigit returns true. + char startChar = identifier.charAt(0); + if (!(Character.isJavaIdentifierStart(startChar))) { + throw new IllegalArgumentException("Identifier does not begin with a valid JMS identifier start character: '" + identifier + "' "); + } + + // JMS part character + int length = identifier.length(); + for (int i = 1; i < length; i++) { + char ch = identifier.charAt(i); + if (!(Character.isJavaIdentifierPart(ch))) { + throw new IllegalArgumentException("Identifier contains invalid JMS identifier character '" + ch + "': '" + identifier + "' "); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageFactory.java new file mode 100644 index 0000000..91c148c --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageFactory.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.message; + +import java.io.Serializable; + +import javax.jms.JMSException; + +/** + * Interface that a Provider should implement to provide a Provider + * Specific JmsMessage implementation that optimizes the exchange of + * message properties and payload between the JMS Message API and the + * underlying Provider Message implementations. + */ +public interface JmsMessageFactory { + + /** + * Creates an instance of a basic JmsMessage object. The provider may + * either create the Message with the default generic internal message + * implementation or create a Provider specific instance that optimizes + * the access and marshaling of the message. + * + * @return a newly created and initialized JmsMessage instance. + * + * @throws JMSException if the provider cannot create the message for some reason. + */ + JmsMessage createMessage() throws JMSException; + + /** + * Creates an instance of a basic JmsTextMessage object. The provider may + * either create the Message with the default generic internal message + * implementation or create a Provider specific instance that optimizes + * the access and marshaling of the message. + * + * @param payload + * The value to initially assign to the Message body, or null if empty to start. + * + * @returns a newly created and initialized JmsTextMessage instance. + * + * @throws JMSException if the provider cannot create the message for some reason. + */ + JmsTextMessage createTextMessage(String payload) throws JMSException; + + /** + * Creates an instance of a basic JmsTextMessage object. The provider may + * either create the Message with the default generic internal message + * implementation or create a Provider specific instance that optimizes + * the access and marshaling of the message. + * + * @returns a newly created and initialized JmsTextMessage instance. + * + * @throws JMSException if the provider cannot create the message for some reason. + */ + JmsTextMessage createTextMessage() throws JMSException; + + /** + * Creates an instance of a basic JmsBytesMessage object. The provider may + * either create the Message with the default generic internal message + * implementation or create a Provider specific instance that optimizes + * the access and marshaling of the message. + * + * @returns a newly created and initialized JmsTextMessage instance. + * + * @throws JMSException if the provider cannot create the message for some reason. + */ + JmsBytesMessage createBytesMessage() throws JMSException; + + /** + * Creates an instance of a basic JmsMapMessage object. The provider may + * either create the Message with the default generic internal message + * implementation or create a Provider specific instance that optimizes + * the access and marshaling of the message. + * + * @returns a newly created and initialized JmsTextMessage instance. + * + * @throws JMSException if the provider cannot create the message for some reason. + */ + JmsMapMessage createMapMessage() throws JMSException; + + /** + * Creates an instance of a basic JmsStreamMessage object. The provider may + * either create the Message with the default generic internal message + * implementation or create a Provider specific instance that optimizes + * the access and marshaling of the message. + * + * @returns a newly created and initialized JmsTextMessage instance. + * + * @throws JMSException if the provider cannot create the message for some reason. + */ + JmsStreamMessage createStreamMessage() throws JMSException; + + /** + * Creates an instance of a basic JmsObjectMessage object. The provider may + * either create the Message with the default generic internal message + * implementation or create a Provider specific instance that optimizes + * the access and marshaling of the message. + * + * @param payload + * The value to initially assign to the Message body, or null if empty to start. + * + * @returns a newly created and initialized JmsObjectMessage instance. + * + * @throws JMSException if the provider cannot create the message for some reason. + */ + JmsObjectMessage createObjectMessage(Serializable payload) throws JMSException; + + /** + * Creates an instance of a basic JmsObjectMessage object. The provider may + * either create the Message with the default generic internal message + * implementation or create a Provider specific instance that optimizes + * the access and marshaling of the message. + * + * @returns a newly created and initialized JmsObjectMessage instance. + * + * @throws JMSException if the provider cannot create the message for some reason. + */ + JmsObjectMessage createObjectMessage() throws JMSException; + +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java new file mode 100644 index 0000000..863f420 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java @@ -0,0 +1,622 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.message; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; + +import org.apache.qpid.jms.JmsDestination; +import org.apache.qpid.jms.exceptions.JmsExceptionSupport; +import org.apache.qpid.jms.message.facade.JmsMessageFacade; +import org.apache.qpid.jms.meta.JmsMessageId; +import org.apache.qpid.jms.util.TypeConversionSupport; + +/** + * Utility class used to intercept calls to Message property gets and sets and map the + * correct fields in the underlying JmsMessageFacade to the property name being operated on. + */ +public class JmsMessagePropertyIntercepter { + + private static final Map<String, PropertyIntercepter> PROPERTY_INTERCEPTERS = + new HashMap<String, PropertyIntercepter>(); + + /** + * Interface for a Property intercepter object used to write JMS style + * properties that are part of the JMS Message object members or perform + * some needed conversion action before some named property is read or + * written. If a property is not writable then the intercepter should + * throw an JMSException to indicate the error. + */ + interface PropertyIntercepter { + + /** + * Called when the names property is queried from an JMS Message object. + * + * @param message + * The message being acted upon. + * + * @return the correct property value from the given Message. + * + * @throws JMSException if an error occurs while accessing the property + */ + Object getProperty(JmsMessageFacade message) throws JMSException; + + /** + * Called when the names property is assigned from an JMS Message object. + * + * @param message + * The message instance being acted upon. + * @param value + * The value to assign to the intercepted property. + * + * @throws JMSException if an error occurs writing the property. + */ + void setProperty(JmsMessageFacade message, Object value) throws JMSException; + + /** + * Indicates if the intercepted property has a value currently assigned. + * + * @param message + * The message instance being acted upon. + * + * @return true if the intercepted property has a value assigned to it. + */ + boolean propertyExists(JmsMessageFacade message); + + } + + static { + PROPERTY_INTERCEPTERS.put("JMSXDeliveryCount", new PropertyIntercepter() { + @Override + public void setProperty(JmsMessageFacade message, Object value) throws JMSException { + Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class); + if (rc == null) { + throw new JMSException("Property JMSXDeliveryCount cannot be set from a " + value.getClass().getName() + "."); + } + message.setRedeliveryCounter(rc.intValue() - 1); + } + + @Override + public Object getProperty(JmsMessageFacade message) throws JMSException { + return Integer.valueOf(message.getRedeliveryCounter() + 1); + } + + @Override + public boolean propertyExists(JmsMessageFacade message) { + return true; + } + }); + PROPERTY_INTERCEPTERS.put("JMSDestination", new PropertyIntercepter() { + @Override + public void setProperty(JmsMessageFacade message, Object value) throws JMSException { + JmsDestination rc = (JmsDestination) TypeConversionSupport.convert(value, JmsDestination.class); + if (rc == null) { + throw new JMSException("Property JMSDestination cannot be set from a " + value.getClass().getName() + "."); + } + message.setDestination(rc); + } + + @Override + public Object getProperty(JmsMessageFacade message) throws JMSException { + Destination dest = message.getDestination(); + if (dest == null) { + return null; + } + return dest.toString(); + } + + @Override + public boolean propertyExists(JmsMessageFacade message) { + return message.getDestination() != null; + } + }); + PROPERTY_INTERCEPTERS.put("JMSReplyTo", new PropertyIntercepter() { + @Override + public void setProperty(JmsMessageFacade message, Object value) throws JMSException { + JmsDestination rc = (JmsDestination) TypeConversionSupport.convert(value, JmsDestination.class); + if (rc == null) { + throw new JMSException("Property JMSReplyTo cannot be set from a " + value.getClass().getName() + "."); + } + message.setReplyTo(rc); + } + + @Override + public Object getProperty(JmsMessageFacade message) throws JMSException { + if (message.getReplyTo() == null) { + return null; + } + return message.getReplyTo().toString(); + } + + @Override + public boolean propertyExists(JmsMessageFacade message) { + return message.getReplyTo() != null; + } + }); + PROPERTY_INTERCEPTERS.put("JMSType", new PropertyIntercepter() { + @Override + public Object getProperty(JmsMessageFacade message) throws JMSException { + return message.getType(); + } + + @Override + public void setProperty(JmsMessageFacade message, Object value) throws JMSException { + String rc = (String) TypeConversionSupport.convert(value, String.class); + if (rc == null) { + throw new JMSException("Property JMSType cannot be set from a " + value.getClass().getName() + "."); + } + message.setType(rc); + } + + @Override + public boolean propertyExists(JmsMessageFacade message) { + return message.getType() != null; + } + }); + PROPERTY_INTERCEPTERS.put("JMSDeliveryMode", new PropertyIntercepter() { + @Override + public Object getProperty(JmsMessageFacade message) throws JMSException { + return message.isPersistent() ? "PERSISTENT" : "NON_PERSISTENT"; + } + + @Override + public void setProperty(JmsMessageFacade message, Object value) throws JMSException { + Integer rc = null; + try { + rc = (Integer) TypeConversionSupport.convert(value, Integer.class); + } catch (NumberFormatException nfe) { + if (value instanceof String) { + if (((String) value).equalsIgnoreCase("PERSISTENT")) { + rc = DeliveryMode.PERSISTENT; + } else if (((String) value).equalsIgnoreCase("NON_PERSISTENT")) { + rc = DeliveryMode.NON_PERSISTENT; + } else { + throw nfe; + } + } + } + if (rc == null) { + Boolean bool = (Boolean) TypeConversionSupport.convert(value, Boolean.class); + if (bool == null) { + throw new JMSException("Property JMSDeliveryMode cannot be set from a " + value.getClass().getName() + "."); + } else { + message.setPersistent(bool.booleanValue()); + } + } else { + message.setPersistent(rc == DeliveryMode.PERSISTENT); + } + } + + @Override + public boolean propertyExists(JmsMessageFacade message) { + return true; + } + }); + PROPERTY_INTERCEPTERS.put("JMSPriority", new PropertyIntercepter() { + @Override + public Object getProperty(JmsMessageFacade message) throws JMSException { + return Integer.valueOf(message.getPriority()); + } + + @Override + public void setProperty(JmsMessageFacade message, Object value) throws JMSException { + Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class); + if (rc == null) { + throw new JMSException("Property JMSPriority cannot be set from a " + value.getClass().getName() + "."); + } + message.setPriority(rc.byteValue()); + } + + @Override + public boolean propertyExists(JmsMessageFacade message) { + return true; + } + }); + PROPERTY_INTERCEPTERS.put("JMSMessageID", new PropertyIntercepter() { + @Override + public Object getProperty(JmsMessageFacade message) throws JMSException { + if (message.getMessageId() == null) { + return null; + } + return message.getMessageId().toString(); + } + + @Override + public void setProperty(JmsMessageFacade message, Object value) throws JMSException { + String rc = (String) TypeConversionSupport.convert(value, String.class); + if (rc == null) { + throw new JMSException("Property JMSMessageID cannot be set from a " + value.getClass().getName() + "."); + } + message.setMessageId(new JmsMessageId(rc)); + } + + @Override + public boolean propertyExists(JmsMessageFacade message) { + return message.getMessageId() != null; + } + }); + PROPERTY_INTERCEPTERS.put("JMSTimestamp", new PropertyIntercepter() { + @Override + public Object getProperty(JmsMessageFacade message) throws JMSException { + return Long.valueOf(message.getTimestamp()); + } + + @Override + public void setProperty(JmsMessageFacade message, Object value) throws JMSException { + Long rc = (Long) TypeConversionSupport.convert(value, Long.class); + if (rc == null) { + throw new JMSException("Property JMSTimestamp cannot be set from a " + value.getClass().getName() + "."); + } + message.setTimestamp(rc.longValue()); + } + + @Override + public boolean propertyExists(JmsMessageFacade message) { + return true; + } + }); + PROPERTY_INTERCEPTERS.put("JMSCorrelationID", new PropertyIntercepter() { + @Override + public Object getProperty(JmsMessageFacade message) throws JMSException { + return message.getCorrelationId(); + } + + @Override + public void setProperty(JmsMessageFacade message, Object value) throws JMSException { + String rc = (String) TypeConversionSupport.convert(value, String.class); + if (rc == null) { + throw new JMSException("Property JMSCorrelationID cannot be set from a " + value.getClass().getName() + "."); + } + message.setCorrelationId(rc); + } + + @Override + public boolean propertyExists(JmsMessageFacade message) { + return message.getCorrelationId() != null; + } + }); + PROPERTY_INTERCEPTERS.put("JMSExpiration", new PropertyIntercepter() { + @Override + public Object getProperty(JmsMessageFacade message) throws JMSException { + return Long.valueOf(message.getExpiration()); + } + + @Override + public void setProperty(JmsMessageFacade message, Object value) throws JMSException { + Long rc = (Long) TypeConversionSupport.convert(value, Long.class); + if (rc == null) { + throw new JMSException("Property JMSExpiration cannot be set from a " + value.getClass().getName() + "."); + } + message.setExpiration(rc.longValue()); + } + + @Override + public boolean propertyExists(JmsMessageFacade message) { + return true; + } + }); + PROPERTY_INTERCEPTERS.put("JMSRedelivered", new PropertyIntercepter() { + @Override + public Object getProperty(JmsMessageFacade message) throws JMSException { + return Boolean.valueOf(message.isRedelivered()); + } + + @Override + public void setProperty(JmsMessageFacade message, Object value) throws JMSException { + Boolean rc = (Boolean) TypeConversionSupport.convert(value, Boolean.class); + if (rc == null) { + throw new JMSException("Property JMSRedelivered cannot be set from a " + value.getClass().getName() + "."); + } + message.setRedelivered(rc.booleanValue()); + } + + @Override + public boolean propertyExists(JmsMessageFacade message) { + return true; + } + }); + PROPERTY_INTERCEPTERS.put("JMSXGroupID", new PropertyIntercepter() { + @Override + public Object getProperty(JmsMessageFacade message) throws JMSException { + return message.getGroupId(); + } + + @Override + public void setProperty(JmsMessageFacade message, Object value) throws JMSException { + String rc = (String) TypeConversionSupport.convert(value, String.class); + if (rc == null) { + throw new JMSException("Property JMSXGroupID cannot be set from a " + value.getClass().getName() + "."); + } + message.setGroupId(rc); + } + + @Override + public boolean propertyExists(JmsMessageFacade message) { + return message.getGroupId() != null; + } + }); + PROPERTY_INTERCEPTERS.put("JMSXGroupSeq", new PropertyIntercepter() { + @Override + public Object getProperty(JmsMessageFacade message) throws JMSException { + return message.getGroupSequence(); + } + + @Override + public void setProperty(JmsMessageFacade message, Object value) throws JMSException { + Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class); + if (rc == null) { + throw new JMSException("Property JMSXGroupSeq cannot be set from a " + value.getClass().getName() + "."); + } + message.setGroupSequence(rc.intValue()); + } + + @Override + public boolean propertyExists(JmsMessageFacade message) { + return true; + } + }); + PROPERTY_INTERCEPTERS.put("JMSXUserID", new PropertyIntercepter() { + @Override + public Object getProperty(JmsMessageFacade message) throws JMSException { + Object userId = message.getUserId(); + if (userId == null) { + try { + userId = message.getProperty("JMSXUserID"); + } catch (Exception e) { + throw JmsExceptionSupport.create(e); + } + } + + return userId; + } + + @Override + public void setProperty(JmsMessageFacade message, Object value) throws JMSException { + String rc = (String) TypeConversionSupport.convert(value, String.class); + if (rc == null) { + throw new JMSException("Property JMSXUserID cannot be set from a " + value.getClass().getName() + "."); + } + message.setUserId(rc); + } + + @Override + public boolean propertyExists(JmsMessageFacade message) { + return message.getUserId() != null; + } + }); + } + + /** + * Static get method that takes a property name and gets the value either via + * a registered property get object or through the JmsMessageFacade getProperty + * method. + * + * @param message + * the JmsMessageFacade instance to read from + * @param name + * the property name that is being requested. + * + * @return the correct value either mapped to an Message attribute of a Message property. + * + * @throws JMSException if an error occurs while reading the defined property. + */ + public static Object getProperty(JmsMessageFacade message, String name) throws JMSException { + Object value = null; + + PropertyIntercepter jmsPropertyExpression = PROPERTY_INTERCEPTERS.get(name); + if (jmsPropertyExpression != null) { + value = jmsPropertyExpression.getProperty(message); + } else { + value = message.getProperty(name); + } + + return value; + } + + /** + * Static set method that takes a property name and sets the value either via + * a registered property set object or through the JmsMessageFacade setProperty + * method. + * + * @param message + * the JmsMessageFacade instance to write to. + * @param name + * the property name that is being written. + * @param value + * the new value to assign for the named property. + * + * @throws JMSException if an error occurs while writing the defined property. + */ + public static void setProperty(JmsMessageFacade message, String name, Object value) throws JMSException { + PropertyIntercepter jmsPropertyExpression = PROPERTY_INTERCEPTERS.get(name); + if (jmsPropertyExpression != null) { + jmsPropertyExpression.setProperty(message, value); + } else { + message.setProperty(name, value); + } + } + + /** + * Static inspection method to determine if a named property exists for a given message. + * + * @param message + * the JmsMessageFacade instance to read from + * @param name + * the property name that is being inspected. + * + * @return true if the message contains the given property. + * + * @throws JMSException if an error occurs while validating the defined property. + */ + public static boolean propertyExists(JmsMessageFacade message, String name) throws JMSException { + PropertyIntercepter jmsPropertyExpression = PROPERTY_INTERCEPTERS.get(name); + if (jmsPropertyExpression != null) { + return jmsPropertyExpression.propertyExists(message); + } else { + return message.propertyExists(name); + } + } + + /** + * For each of the currently configured message property intercepter instance a + * string key value is inserted into an Set and returned. + * + * @return a Set<String> containing the names of all intercepted properties. + */ + public static Set<String> getAllPropertyNames() { + return PROPERTY_INTERCEPTERS.keySet(); + } + + /** + * For each of the currently configured message property intercepter instance a + * string key value is inserted into an Set and returned if the property has a + * value and is available for a read operation. + * + * @return a Set<String> containing the names of all intercepted properties with a value. + */ + public static Set<String> getPropertyNames(JmsMessageFacade message) { + Set<String> names = new HashSet<String>(); + for (Entry<String, PropertyIntercepter> entry : PROPERTY_INTERCEPTERS.entrySet()) { + if (entry.getValue().propertyExists(message)) { + names.add(entry.getKey()); + } + } + return names; + } + + /** + * Allows for the additional PropertyIntercepter instances to be added to the global set. + * + * @param propertyName + * The name of the Message property that will be intercepted. + * @param getter + * The PropertyIntercepter instance that should be used for the named property. + */ + public static void addPropertySetter(String propertyName, PropertyIntercepter getter) { + PROPERTY_INTERCEPTERS.put(propertyName, getter); + } + + /** + * Given a property name, remove the configured intercepter that has been assigned to + * intercept the queries for that property value. + * + * @param propertyName + * The name of the PropertyIntercepter to remove. + * + * @return true if a getter was removed from the global set. + */ + public boolean removePropertySetter(String propertyName) { + if (PROPERTY_INTERCEPTERS.remove(propertyName) != null) { + return true; + } + + return false; + } + + private final String name; + private final PropertyIntercepter jmsPropertyExpression; + + /** + * Creates an new property getter instance that is assigned to read the named value. + * + * @param name + * the property value that this getter is assigned to lookup. + */ + public JmsMessagePropertyIntercepter(String name) { + this.name = name; + this.jmsPropertyExpression = PROPERTY_INTERCEPTERS.get(name); + } + + /** + * Gets the correct property value from the JmsMessageFacade instance based on + * the predefined property mappings. + * + * @param message + * the JmsMessageFacade whose property is being read. + * + * @return the correct value either mapped to an Message attribute of a Message property. + * + * @throws JMSException if an error occurs while reading the defined property. + */ + public Object get(JmsMessageFacade message) throws JMSException { + if (jmsPropertyExpression != null) { + return jmsPropertyExpression.getProperty(message); + } + + return message.getProperty(name); + } + + /** + * Sets the correct property value from the JmsMessageFacade instance based on + * the predefined property mappings. + * + * @param message + * the JmsMessageFacade whose property is being read. + * @param value + * the value to be set on the intercepted JmsMessageFacade property. + * + * @throws JMSException if an error occurs while reading the defined property. + */ + public void set(JmsMessageFacade message, Object value) throws JMSException { + if (jmsPropertyExpression != null) { + jmsPropertyExpression.setProperty(message, value); + } else { + message.setProperty(name, value); + } + } + + /** + * @return the property name that is being intercepted for the JmsMessageFacade. + */ + public String getName() { + return name; + } + + /** + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + return name; + } + + /** + * @see java.lang.Object#hashCode() + */ + @Override + public int hashCode() { + return name.hashCode(); + } + + /** + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(Object o) { + if (o == null || !this.getClass().equals(o.getClass())) { + return false; + } + return name.equals(((JmsMessagePropertyIntercepter) o).name); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageTransformation.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageTransformation.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageTransformation.java new file mode 100644 index 0000000..404cabd --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageTransformation.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.message; + +import java.util.Enumeration; + +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageEOFException; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.StreamMessage; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.JmsDestination; +import org.apache.qpid.jms.JmsQueue; +import org.apache.qpid.jms.JmsTemporaryQueue; +import org.apache.qpid.jms.JmsTemporaryTopic; +import org.apache.qpid.jms.JmsTopic; + +/** + * A helper class for converting normal JMS interfaces into the QpidJMS specific + * versions. + */ +public final class JmsMessageTransformation { + + private JmsMessageTransformation() { + } + + /** + * Creates a an available JMS message from another provider. + * + * @param destination + * - Destination to be converted into Jms's implementation. + * @return JmsDestination - Jms's implementation of the + * destination. + * @throws JMSException + * @throws JMSException + * if an error occurs + */ + public static JmsDestination transformDestination(JmsConnection connection, Destination destination) throws JMSException { + JmsDestination result = null; + + if (destination != null) { + if (destination instanceof JmsDestination) { + return (JmsDestination) destination; + + } else { + if (destination instanceof TemporaryQueue) { + result = new JmsTemporaryQueue(((TemporaryQueue) destination).getQueueName()); + } else if (destination instanceof TemporaryTopic) { + result = new JmsTemporaryTopic(((TemporaryTopic) destination).getTopicName()); + } else if (destination instanceof Queue) { + result = new JmsQueue(((Queue) destination).getQueueName()); + } else if (destination instanceof Topic) { + result = new JmsTopic(((Topic) destination).getTopicName()); + } + } + } + + return result; + } + + /** + * Creates a fast shallow copy of the current JmsMessage or creates a + * whole new message instance from an available JMS message from another + * provider. + * + * @param message + * - Message to be converted into Jms's implementation. + * @param connection + * @return JmsMessage - Jms's implementation object of the + * message. + * @throws JMSException + * if an error occurs + */ + public static JmsMessage transformMessage(JmsConnection connection, Message message) throws JMSException { + if (message instanceof JmsMessage) { + return ((JmsMessage) message).copy(); + } else { + JmsMessage activeMessage = null; + JmsMessageFactory factory = connection.getMessageFactory(); + + if (message instanceof BytesMessage) { + BytesMessage bytesMsg = (BytesMessage) message; + bytesMsg.reset(); + JmsBytesMessage msg = factory.createBytesMessage(); + try { + for (;;) { + // Reads a byte from the message stream until the stream + // is empty + msg.writeByte(bytesMsg.readByte()); + } + } catch (MessageEOFException e) { + // if an end of message stream as expected + } catch (JMSException e) { + } + + activeMessage = msg; + } else if (message instanceof MapMessage) { + MapMessage mapMsg = (MapMessage) message; + JmsMapMessage msg = factory.createMapMessage(); + Enumeration<?> iter = mapMsg.getMapNames(); + + while (iter.hasMoreElements()) { + String name = iter.nextElement().toString(); + msg.setObject(name, mapMsg.getObject(name)); + } + + activeMessage = msg; + } else if (message instanceof ObjectMessage) { + ObjectMessage objMsg = (ObjectMessage) message; + JmsObjectMessage msg = factory.createObjectMessage(); + msg.setObject(objMsg.getObject()); + activeMessage = msg; + } else if (message instanceof StreamMessage) { + StreamMessage streamMessage = (StreamMessage) message; + streamMessage.reset(); + JmsStreamMessage msg = factory.createStreamMessage(); + Object obj = null; + + try { + while ((obj = streamMessage.readObject()) != null) { + msg.writeObject(obj); + } + } catch (MessageEOFException e) { + // if an end of message stream as expected + } catch (JMSException e) { + } + + activeMessage = msg; + } else if (message instanceof TextMessage) { + TextMessage textMsg = (TextMessage) message; + JmsTextMessage msg = factory.createTextMessage(); + msg.setText(textMsg.getText()); + activeMessage = msg; + } else { + activeMessage = factory.createTextMessage(); + } + + copyProperties(connection, message, activeMessage); + + return activeMessage; + } + } + + /** + * Copies the standard JMS and user defined properties from the givem + * message to the specified message + * + * @param fromMessage + * the message to take the properties from + * @param toMessage + * the message to add the properties to + * @throws JMSException + */ + public static void copyProperties(JmsConnection connection, Message fromMessage, Message toMessage) throws JMSException { + toMessage.setJMSMessageID(fromMessage.getJMSMessageID()); + toMessage.setJMSCorrelationID(fromMessage.getJMSCorrelationID()); + toMessage.setJMSReplyTo(transformDestination(connection, fromMessage.getJMSReplyTo())); + toMessage.setJMSDestination(transformDestination(connection, fromMessage.getJMSDestination())); + toMessage.setJMSDeliveryMode(fromMessage.getJMSDeliveryMode()); + toMessage.setJMSRedelivered(fromMessage.getJMSRedelivered()); + toMessage.setJMSType(fromMessage.getJMSType()); + toMessage.setJMSExpiration(fromMessage.getJMSExpiration()); + toMessage.setJMSPriority(fromMessage.getJMSPriority()); + toMessage.setJMSTimestamp(fromMessage.getJMSTimestamp()); + + Enumeration<?> propertyNames = fromMessage.getPropertyNames(); + + while (propertyNames.hasMoreElements()) { + String name = propertyNames.nextElement().toString(); + Object obj = fromMessage.getObjectProperty(name); + toMessage.setObjectProperty(name, obj); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsObjectMessage.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsObjectMessage.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsObjectMessage.java new file mode 100644 index 0000000..e439764 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsObjectMessage.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.message; + +import java.io.Serializable; + +import javax.jms.JMSException; +import javax.jms.MessageFormatException; +import javax.jms.ObjectMessage; + +import org.apache.qpid.jms.message.facade.JmsObjectMessageFacade; + +/** + * An <CODE>ObjectMessage</CODE> object is used to send a message that contains a serializable + * object in the Java programming language ("Java object"). It inherits from the + * <CODE>Message</CODE> interface and adds a body containing a single reference to an object. + * Only <CODE>Serializable</CODE> Java objects can be used. + * <p/> + * <p/> + * If a collection of Java objects must be sent, one of the <CODE>Collection</CODE> classes + * provided since JDK 1.2 can be used. + * <p/> + * <p/> + * When a client receives an <CODE>ObjectMessage</CODE>, it is in read-only mode. If a client + * attempts to write to the message at this point, a <CODE>MessageNotWriteableException</CODE> + * is thrown. If <CODE>clearBody</CODE> is called, the message can now be both read from and + * written to. + * + * @see javax.jms.Session#createObjectMessage() + * @see javax.jms.Session#createObjectMessage(Serializable) + * @see javax.jms.BytesMessage + * @see javax.jms.MapMessage + * @see javax.jms.Message + * @see javax.jms.StreamMessage + * @see javax.jms.TextMessage + */ +public class JmsObjectMessage extends JmsMessage implements ObjectMessage { + + private final JmsObjectMessageFacade facade; + + public JmsObjectMessage(JmsObjectMessageFacade facade) { + super(facade); + this.facade = facade; + } + + @Override + public JmsObjectMessage copy() throws JMSException { + JmsObjectMessage other = new JmsObjectMessage(facade.copy()); + other.copy(this); + return other; + } + + /** + * Sets the serializable object containing this message's data. It is important to note that + * an <CODE>ObjectMessage</CODE> contains a snapshot of the object at the time + * <CODE>setObject()</CODE> is called; subsequent modifications of the object will have no + * effect on the <CODE>ObjectMessage</CODE> body. + * + * @param newObject + * the message's data + * @throws JMSException + * if the JMS provider fails to set the object due to some internal error. + * @throws javax.jms.MessageFormatException + * if object serialization fails. + * @throws javax.jms.MessageNotWriteableException + * if the message is in read-only mode. + */ + @Override + public void setObject(Serializable newObject) throws JMSException { + checkReadOnlyBody(); + try { + this.facade.setObject(newObject); + } catch (Exception e) { + throw new MessageFormatException("Failed to serialize object"); + } + } + + /** + * Gets the serializable object containing this message's data. The default value is null. + * + * @return the serializable object containing this message's data + * @throws JMSException + */ + @Override + public Serializable getObject() throws JMSException { + try { + return this.facade.getObject(); + } catch (Exception e) { + throw new MessageFormatException("Failed to read object"); + } + } + + @Override + public String toString() { + return super.toString(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java new file mode 100644 index 0000000..c77739b --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.message; + +import org.apache.qpid.jms.JmsDestination; +import org.apache.qpid.jms.meta.JmsProducerId; + +/** + * Envelope that wraps the objects involved in a Message send operation. + */ +public class JmsOutboundMessageDispatch { + + private JmsProducerId producerId; + private JmsMessage message; + private JmsDestination destination; + private boolean sendAsync; + + public JmsDestination getDestination() { + return destination; + } + + public void setDestination(JmsDestination destination) { + this.destination = destination; + } + + public JmsMessage getMessage() { + return message; + } + + public void setMessage(JmsMessage message) { + this.message = message; + } + + public JmsProducerId getProducerId() { + return producerId; + } + + public void setProducerId(JmsProducerId producerId) { + this.producerId = producerId; + } + + public void setSendAsync(boolean sendAsync) { + this.sendAsync = sendAsync; + } + + public boolean isSendAsync() { + return sendAsync; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsStreamMessage.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsStreamMessage.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsStreamMessage.java new file mode 100644 index 0000000..bd4f0fe --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsStreamMessage.java @@ -0,0 +1,485 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.message; + +import javax.jms.JMSException; +import javax.jms.MessageFormatException; +import javax.jms.StreamMessage; + +import org.apache.qpid.jms.message.facade.JmsStreamMessageFacade; + +/** + * JMS Stream message implementation. + */ +public class JmsStreamMessage extends JmsMessage implements StreamMessage { + + private static final int NO_BYTES_IN_FLIGHT = -1; + + private final JmsStreamMessageFacade facade; + + private byte[] bytes; + private int remainingBytes = NO_BYTES_IN_FLIGHT; + + public JmsStreamMessage(JmsStreamMessageFacade facade) { + super(facade); + this.facade = facade; + } + + @Override + public JmsStreamMessage copy() throws JMSException { + JmsStreamMessage other = new JmsStreamMessage(facade.copy()); + other.copy(this); + return other; + } + + @Override + public void onSend() throws JMSException { + super.onSend(); + reset(); + } + + @Override + public void clearBody() throws JMSException { + super.clearBody(); + bytes = null; + remainingBytes = -1; + } + + @Override + public boolean readBoolean() throws JMSException { + checkWriteOnlyBody(); + checkBytesInFlight(); + + Boolean result = null; + Object value; + value = facade.peek(); + + if (value instanceof Boolean) { + result = (Boolean) value; + } else if (value instanceof String || value == null) { + result = Boolean.valueOf((String) value); + } else { + throw new MessageFormatException( + "stream value: " + value.getClass().getSimpleName() + " cannot be converted to a boolean."); + } + + facade.pop(); + return result; + } + + @Override + public byte readByte() throws JMSException { + checkWriteOnlyBody(); + checkBytesInFlight(); + + Byte result = null; + Object value = facade.peek(); + + if (value instanceof Byte) { + result = (Byte) value; + } else if (value instanceof String || value == null) { + result = Byte.valueOf((String) value); + } else { + throw new MessageFormatException( + "stream value: " + value.getClass().getSimpleName() + " cannot be converted to a boolean."); + } + + facade.pop(); + return result; + } + + @Override + public short readShort() throws JMSException { + checkWriteOnlyBody(); + checkBytesInFlight(); + + Short result = null; + Object value = facade.peek(); + + if (value instanceof Short) { + result = (Short) value; + } else if (value instanceof Byte) { + result = ((Byte) value).shortValue(); + } else if (value instanceof String || value == null) { + result = Short.valueOf((String) value); + } else { + throw new MessageFormatException( + "stream value: " + value.getClass().getSimpleName() + " cannot be converted to a boolean."); + } + + facade.pop(); + return result; + } + + @Override + public char readChar() throws JMSException { + checkWriteOnlyBody(); + checkBytesInFlight(); + + Character result = null; + Object value = facade.peek(); + + if (value instanceof Character) { + result = (Character) value; + } else if (value == null) { + throw new NullPointerException("Cannot convert NULL value to char."); + } else { + throw new MessageFormatException( + "stream value: " + value.getClass().getSimpleName() + " cannot be converted to a boolean."); + } + + facade.pop(); + return result; + } + + @Override + public int readInt() throws JMSException { + checkWriteOnlyBody(); + checkBytesInFlight(); + + Integer result = null; + Object value = facade.peek(); + + if (value instanceof Integer) { + result = (Integer) value; + } else if (value instanceof Short) { + result = ((Short) value).intValue(); + } else if (value instanceof Byte) { + result = ((Byte) value).intValue(); + } else if (value instanceof String || value == null) { + result = Integer.valueOf((String) value); + } else { + throw new MessageFormatException( + "stream value: " + value.getClass().getSimpleName() + " cannot be converted to a boolean."); + } + + facade.pop(); + return result; + } + + @Override + public long readLong() throws JMSException { + checkWriteOnlyBody(); + checkBytesInFlight(); + + Long result = null; + Object value = facade.peek(); + + if (value instanceof Long) { + result = (Long) value; + } else if (value instanceof Integer) { + result = ((Integer) value).longValue(); + } else if (value instanceof Short) { + result = ((Short) value).longValue(); + } else if (value instanceof Byte) { + result = ((Byte) value).longValue(); + } else if (value instanceof String || value == null) { + result = Long.valueOf((String) value); + } else { + throw new MessageFormatException( + "stream value: " + value.getClass().getSimpleName() + " cannot be converted to a boolean."); + } + + facade.pop(); + return result; + } + + @Override + public float readFloat() throws JMSException { + checkWriteOnlyBody(); + checkBytesInFlight(); + + Float result = null; + Object value = facade.peek(); + + if (value instanceof Float) { + result = (Float) value; + } else if (value instanceof String || value == null) { + result = Float.valueOf((String) value); + } else { + throw new MessageFormatException( + "stream value: " + value.getClass().getSimpleName() + " cannot be converted to a boolean."); + } + + facade.pop(); + return result; + } + + @Override + public double readDouble() throws JMSException { + checkWriteOnlyBody(); + checkBytesInFlight(); + + Double result = null; + Object value = facade.peek(); + + if (value instanceof Double) { + result = (Double) value; + } else if (value instanceof Float) { + result = ((Float) value).doubleValue(); + } else if (value instanceof String || value == null) { + result = Double.valueOf((String) value); + } else { + throw new MessageFormatException( + "stream value: " + value.getClass().getSimpleName() + " cannot be converted to a boolean."); + } + + facade.pop(); + return result; + } + + @Override + public String readString() throws JMSException { + checkWriteOnlyBody(); + checkBytesInFlight(); + + String result = null; + Object value = facade.peek(); + + if (value == null) { + result = null; + } else if (value instanceof String) { + result = (String) value; + } else if (value instanceof Float) { + result = value.toString(); + } else if (value instanceof Double) { + result = value.toString(); + } else if (value instanceof Long) { + result = value.toString(); + } else if (value instanceof Integer) { + result = value.toString(); + } else if (value instanceof Short) { + result = value.toString(); + } else if (value instanceof Byte) { + result = value.toString(); + } else if (value instanceof Boolean) { + result = value.toString(); + } else if (value instanceof Character) { + result = value.toString(); + } else { + throw new MessageFormatException("stream cannot convert byte array to String"); + } + + facade.pop(); + return result; + } + + @Override + public int readBytes(byte[] target) throws JMSException { + checkWriteOnlyBody(); + + if (target == null) { + throw new NullPointerException("target byte array was null"); + } + + if (remainingBytes == NO_BYTES_IN_FLIGHT) { + Object data = facade.peek(); + if (data == null) { + facade.pop(); + return -1; + } else if (!(data instanceof byte[])) { + throw new MessageFormatException("Next stream value is not a byte array"); + } + + bytes = (byte[]) data; + remainingBytes = bytes.length; + } else if (remainingBytes == 0) { + // We previously read all the bytes, but must have filled the destination array. + remainingBytes = NO_BYTES_IN_FLIGHT; + bytes = null; + facade.pop(); + return -1; + } + + int previouslyRead = bytes.length - remainingBytes; + int lengthToCopy = Math.min(target.length, remainingBytes); + + if (lengthToCopy > 0) { + System.arraycopy(bytes, previouslyRead, target, 0, lengthToCopy); + } + + remainingBytes -= lengthToCopy; + + if (remainingBytes == 0 && lengthToCopy < target.length) { + // All bytes have been read and the destination array was not filled on this + // call, so the return will enable the caller to determine completion immediately. + remainingBytes = NO_BYTES_IN_FLIGHT; + bytes = null; + facade.pop(); + } + + return lengthToCopy; + } + + @Override + public Object readObject() throws JMSException { + checkWriteOnlyBody(); + checkBytesInFlight(); + + Object result = null; + Object value = facade.peek(); + + if (value == null) { + result = null; + } else if (value instanceof String) { + result = value; + } else if (value instanceof Float) { + result = value; + } else if (value instanceof Double) { + result = value; + } else if (value instanceof Long) { + result = value; + } else if (value instanceof Integer) { + result = value; + } else if (value instanceof Short) { + result = value; + } else if (value instanceof Byte) { + result = value; + } else if (value instanceof Boolean) { + result = value; + } else if (value instanceof Character) { + result = value; + } else if (value instanceof byte[]) { + byte[] original = (byte[]) value; + result = new byte[original.length]; + System.arraycopy(original, 0, result, 0, original.length); + } else { + throw new MessageFormatException("Unknown type found in stream"); + } + + facade.pop(); + return result; + } + + @Override + public void writeBoolean(boolean value) throws JMSException { + checkReadOnlyBody(); + facade.put(value); + } + + @Override + public void writeByte(byte value) throws JMSException { + checkReadOnlyBody(); + facade.put(value); + } + + @Override + public void writeShort(short value) throws JMSException { + checkReadOnlyBody(); + facade.put(value); + } + + @Override + public void writeChar(char value) throws JMSException { + checkReadOnlyBody(); + facade.put(value); + } + + @Override + public void writeInt(int value) throws JMSException { + checkReadOnlyBody(); + facade.put(value); + } + + @Override + public void writeLong(long value) throws JMSException { + checkReadOnlyBody(); + facade.put(value); + } + + @Override + public void writeFloat(float value) throws JMSException { + checkReadOnlyBody(); + facade.put(value); + } + + @Override + public void writeDouble(double value) throws JMSException { + checkReadOnlyBody(); + facade.put(value); + } + + @Override + public void writeString(String value) throws JMSException { + checkReadOnlyBody(); + facade.put(value); + } + + @Override + public void writeBytes(byte[] value) throws JMSException { + writeBytes(value, 0, value.length); + } + + @Override + public void writeBytes(byte[] value, int offset, int length) throws JMSException { + checkReadOnlyBody(); + byte[] copy = new byte[length]; + System.arraycopy(value, offset, copy, 0, length); + facade.put(copy); + } + + @Override + public void writeObject(Object value) throws JMSException { + checkReadOnlyBody(); + if (value == null) { + facade.put(null); + } else if (value instanceof String) { + facade.put(value); + } else if (value instanceof Character) { + facade.put(value); + } else if (value instanceof Boolean) { + facade.put(value); + } else if (value instanceof Byte) { + facade.put(value); + } else if (value instanceof Short) { + facade.put(value); + } else if (value instanceof Integer) { + facade.put(value); + } else if (value instanceof Long) { + facade.put(value); + } else if (value instanceof Float) { + facade.put(value); + } else if (value instanceof Double) { + facade.put(value); + } else if (value instanceof byte[]) { + writeBytes((byte[]) value); + } else { + throw new MessageFormatException("Unsupported Object type: " + value.getClass().getSimpleName()); + } + } + + @Override + public void reset() throws JMSException { + bytes = null; + remainingBytes = NO_BYTES_IN_FLIGHT; + setReadOnlyBody(true); + facade.reset(); + } + + @Override + public String toString() { + // TODO - Better toString() + return super.toString() + " JmsStreamMessage{ " + facade + " }"; + } + + private void checkBytesInFlight() throws MessageFormatException { + if (remainingBytes != NO_BYTES_IN_FLIGHT) { + throw new MessageFormatException( + "Partially read byte[] entry still being retrieved using readBytes(byte[] dest)"); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsTextMessage.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsTextMessage.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsTextMessage.java new file mode 100644 index 0000000..a08550e --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsTextMessage.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.message; + +import javax.jms.JMSException; +import javax.jms.MessageNotWriteableException; +import javax.jms.TextMessage; + +import org.apache.qpid.jms.message.facade.JmsTextMessageFacade; + +public class JmsTextMessage extends JmsMessage implements TextMessage { + + private final JmsTextMessageFacade facade; + + public JmsTextMessage(JmsTextMessageFacade facade) { + super(facade); + this.facade = facade; + } + + @Override + public JmsTextMessage copy() throws JMSException { + JmsTextMessage other = new JmsTextMessage(facade.copy()); + other.copy(this); + return other; + } + + private void copy(JmsTextMessage other) throws JMSException { + super.copy(other); + } + + @Override + public void setText(String text) throws JMSException, MessageNotWriteableException { + checkReadOnlyBody(); + this.facade.setText(text); + } + + @Override + public String getText() throws JMSException { + return facade.getText(); + } + + @Override + public String toString() { + + String text = ""; + try { + text = facade.getText(); + } catch (JMSException e) { + } + + // TODO - Better toString implementation + return super.toString() + ":text=" + text; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
