http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java new file mode 100644 index 0000000..1131829 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java @@ -0,0 +1,674 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.amqp.message; + +import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_AMQP_TTL; +import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MESSAGE; +import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MSG_TYPE; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import javax.jms.JMSException; +import javax.jms.MessageFormatException; + +import org.apache.qpid.jms.JmsDestination; +import org.apache.qpid.jms.message.facade.JmsMessageFacade; +import org.apache.qpid.jms.meta.JmsMessageId; +import org.apache.qpid.jms.provider.amqp.AmqpConnection; +import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.UnsignedByte; +import org.apache.qpid.proton.amqp.UnsignedInteger; +import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; +import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.message.Message; + +/** + * + */ +public class AmqpJmsMessageFacade implements JmsMessageFacade { + + private static final int DEFAULT_PRIORITY = javax.jms.Message.DEFAULT_PRIORITY; + private static final Charset UTF8 = Charset.forName("UTF-8"); + private static final long MAX_UINT = 0xFFFFFFFFL; + + protected final Message message; + protected final AmqpConnection connection; + + private MessageAnnotations annotations; + private Map<Symbol,Object> annotationsMap; + private Map<String,Object> propertiesMap; + + private JmsDestination replyTo; + private JmsDestination destination; + + private Long syntheticTTL; + + /** + * Used to record the value of JMS_AMQP_TTL property + * if it is explicitly set by the application + */ + private Long userSpecifiedTTL = null; + + /** + * Create a new AMQP Message Facade with an empty message instance. + */ + public AmqpJmsMessageFacade(AmqpConnection connection) { + this.message = Proton.message(); + this.message.setDurable(true); + + this.connection = connection; + setAnnotation(JMS_MSG_TYPE, JMS_MESSAGE); + } + + /** + * Creates a new Facade around an incoming AMQP Message for dispatch to the + * JMS Consumer instance. + * + * @param connection + * the connection that created this Facade. + * @param message + * the incoming Message instance that is being wrapped. + */ + @SuppressWarnings("unchecked") + public AmqpJmsMessageFacade(AmqpConnection connection, Message message) { + this.message = message; + this.connection = connection; + + annotations = message.getMessageAnnotations(); + if (annotations != null) { + annotationsMap = annotations.getValue(); + } + + if (message.getApplicationProperties() != null) { + propertiesMap = message.getApplicationProperties().getValue(); + } + + Long ttl = message.getTtl(); + Long absoluteExpiryTime = getAbsoluteExpiryTime(); + if (absoluteExpiryTime == null && ttl != null) { + syntheticTTL = System.currentTimeMillis() + ttl; + } + + // TODO - Set destination + // TODO - Set replyTo + } + + /** + * @return the appropriate byte value that indicates the type of message this is. + */ + public byte getJmsMsgType() { + return JMS_MESSAGE; + } + + /** + * The annotation value for the JMS Message content type. For a generic JMS message this + * value is omitted so we return null here, subclasses should override this to return the + * correct content type value for their payload. + * + * @return a String value indicating the message content type. + */ + public String getContentType() { + return message.getContentType(); + } + + public void setContentType(String value) { + message.setContentType(value); + } + + @Override + public boolean isEmpty() { + return true; + } + + @Override + public Map<String, Object> getProperties() throws JMSException { + lazyCreateProperties(); + return Collections.unmodifiableMap(new HashMap<String, Object>(propertiesMap)); + } + + @Override + public boolean propertyExists(String key) throws JMSException { + return AmqpJmsMessagePropertyIntercepter.getProperty(this, key) != null; + } + + public boolean applicationPropertyExists(String key) throws JMSException { + if (propertiesMap != null) { + return propertiesMap.containsKey(key); + } + + return false; + } + + /** + * Returns a set of all the property names that have been set in this message. + * + * @return a set of property names in the message or an empty set if none are set. + */ + public Set<String> getPropertyNames() { + Set<String> properties = AmqpJmsMessagePropertyIntercepter.getPropertyNames(this); + if (propertiesMap != null) { + properties.addAll(propertiesMap.keySet()); + } + return properties; + } + + @Override + public Object getProperty(String key) throws JMSException { + return AmqpJmsMessagePropertyIntercepter.getProperty(this, key); + } + + public Object getApplicationProperty(String key) throws JMSException { + if (propertiesMap != null) { + return propertiesMap.get(key); + } + + return null; + } + + @Override + public void setProperty(String key, Object value) throws JMSException { + if (key == null) { + throw new IllegalArgumentException("Property key must not be null"); + } + + AmqpJmsMessagePropertyIntercepter.setProperty(this, key, value); + } + + public void setApplicationProperty(String key, Object value) throws JMSException { + if (propertiesMap == null) { + lazyCreateProperties(); + } + + propertiesMap.put(key, value); + } + + @Override + public void onSend() throws JMSException { + String contentType = getContentType(); + byte jmsMsgType = getJmsMsgType(); + + if (contentType != null) { + message.setContentType(contentType); + } + setAnnotation(JMS_MSG_TYPE, jmsMsgType); + } + + @Override + public void clearBody() { + message.setBody(null); + } + + @Override + public void clearProperties() { + clearProperties(); + //_propJMS_AMQP_TTL = null; + message.setReplyToGroupId(null); + message.setUserId(null); + message.setGroupId(null); + setGroupSequence(0); + + // TODO - Clear others as needed. + } + + @Override + public JmsMessageFacade copy() throws JMSException { + AmqpJmsMessageFacade copy = new AmqpJmsMessageFacade(connection, message); + copyInto(copy); + return copy; + } + + protected void copyInto(AmqpJmsMessageFacade target) { + // TODO - Copy message. + } + + @Override + public JmsMessageId getMessageId() { + Object result = message.getMessageId(); + if (result != null) { + if (result instanceof String) { + return new JmsMessageId((String) result); + } else { + // TODO + throw new RuntimeException("No support for non-String IDs yet."); + } + } + + //TODO: returning a null JmsMessageId object leads to NPE during delivery processing + return null; + } + + @Override + public void setMessageId(JmsMessageId messageId) { + if (messageId != null) { + message.setMessageId(messageId.toString()); + } else { + message.setMessageId(null); + } + } + + @Override + public long getTimestamp() { + if (message.getProperties() != null) { + Date timestamp = message.getProperties().getCreationTime(); + if (timestamp != null) { + return timestamp.getTime(); + } + } + + return 0L; + } + + @Override + public void setTimestamp(long timestamp) { + if (message.getProperties() != null) { + if (timestamp != 0) { + message.setCreationTime(timestamp); + } else { + message.getProperties().setCreationTime(null); + } + } + } + + @Override + public String getCorrelationId() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setCorrelationId(String correlationId) { + // TODO Auto-generated method stub + + } + + @Override + public byte[] getCorrelationIdBytes() throws JMSException { + Object correlationId = message.getCorrelationId(); + if (correlationId == null) { + return null; + } else if (correlationId instanceof ByteBuffer) { + ByteBuffer dup = ((ByteBuffer) correlationId).duplicate(); + byte[] bytes = new byte[dup.remaining()]; + dup.get(bytes); + return bytes; + } else { + // TODO - Do we need to throw here, or could we just stringify whatever is in + // there and return the UTF-8 bytes? This method is pretty useless so + // maybe we just return something and let the user sort if out if they + // really think they need this. + throw new JMSException("The underlying correlation-id is not binary and so can't be returned"); + } + } + + @Override + public void setCorrelationIdBytes(byte[] correlationId) { + if (correlationId == null) { + message.setCorrelationId(correlationId); + } else { + byte[] bytes = Arrays.copyOf(correlationId, correlationId.length); + message.setCorrelationId(ByteBuffer.wrap(bytes)); + } + } + + @Override + public boolean isPersistent() { + return message.isDurable(); + } + + @Override + public void setPersistent(boolean value) { + this.message.setDurable(value); + } + + @Override + public int getRedeliveryCounter() { + if (message.getHeader() != null) { + UnsignedInteger count = message.getHeader().getDeliveryCount(); + if (count != null) { + return count.intValue(); + } + } + + return 0; + } + + @Override + public void setRedeliveryCounter(int redeliveryCount) { + if (redeliveryCount == 0) { + if (message.getHeader() != null) { + message.getHeader().setDeliveryCount(null); + } + } else { + message.setDeliveryCount(redeliveryCount); + } + } + + @Override + public boolean isRedelivered() { + return getRedeliveryCounter() > 0; + } + + @Override + public void setRedelivered(boolean redelivered) { + if (redelivered) { + if (!isRedelivered()) { + setRedeliveryCounter(1); + } + } else { + if (isRedelivered()) { + setRedeliveryCounter(0); + } + } + } + + @Override + public String getType() { + return (String) getAnnotation(JMS_MSG_TYPE); + } + + @Override + public void setType(String type) { + setAnnotation(JMS_MSG_TYPE, type); + } + + @Override + public byte getPriority() { + if (message.getHeader() != null) { + UnsignedByte priority = message.getHeader().getPriority(); + if (priority != null) { + return priority.byteValue(); + } + } + + return DEFAULT_PRIORITY; + } + + @Override + public void setPriority(byte priority) { + if (priority == DEFAULT_PRIORITY) { + if (message.getHeader() == null) { + return; + } else { + message.getHeader().setPriority(null); + } + } else { + message.setPriority(priority); + } + } + + @Override + public long getExpiration() { + Long absoluteExpiry = getAbsoluteExpiryTime(); + if (absoluteExpiry != null) { + return absoluteExpiry; + } + + if (syntheticTTL != null) { + return syntheticTTL; + } + + return 0; + } + + @Override + public void setExpiration(long expiration) { + syntheticTTL = null; + + if (expiration != 0) { + setAbsoluteExpiryTime(expiration); + } else { + setAbsoluteExpiryTime(null); + } + } + + public void setAmqpTimeToLive(Object value) throws MessageFormatException { + Long ttl = null; + if (value instanceof Long) { + ttl = (Long) value; + } + + if (ttl != null && ttl >= 0 && ttl <= MAX_UINT) { + userSpecifiedTTL = ttl; + } else { + throw new MessageFormatException(JMS_AMQP_TTL + " must be a long with value in range 0 to 2^31 - 1"); + } + } + + public long getAmqpTimeToLive() { + return userSpecifiedTTL; + } + + @Override + public JmsDestination getDestination() { + return destination; + } + + @Override + public void setDestination(JmsDestination destination) { + this.destination = destination; + + // TODO + } + + @Override + public JmsDestination getReplyTo() { + return replyTo; + } + + @Override + public void setReplyTo(JmsDestination replyTo) { + this.replyTo = replyTo; + // TODO Auto-generated method stub + } + + public void setReplyToGroupId(String replyToGroupId) { + message.setReplyToGroupId(replyToGroupId); + } + + public String getReplyToGroupId() { + return message.getReplyToGroupId(); + } + + @Override + public String getUserId() { + String userId = null; + byte[] userIdBytes = message.getUserId(); + + if (userIdBytes != null) { + userId = new String(userIdBytes, UTF8); + } + + return userId; + } + + @Override + public void setUserId(String userId) { + message.setUserId(userId.getBytes(UTF8)); + } + + @Override + public String getGroupId() { + return message.getGroupId(); + } + + @Override + public void setGroupId(String groupId) { + message.setGroupId(groupId); + } + + @Override + public int getGroupSequence() { + if (message.getProperties() != null) { + UnsignedInteger sequence = message.getProperties().getGroupSequence(); + if (sequence != null) { + return sequence.intValue(); + } + } + + return 0; + } + + @Override + public void setGroupSequence(int groupSequence) { + if (groupSequence < 0 && message.getProperties() != null) { + message.getProperties().setGroupSequence(null); + } else if (groupSequence > 0) { + message.setGroupSequence(groupSequence); + } + } + + /** + * @return the true AMQP Message instance wrapped by this Facade. + */ + public Message getAmqpMessage() { + return this.message; + } + + /** + * The AmqpConnection instance that is associated with this Message. + * @return + */ + public AmqpConnection getConnection() { + return connection; + } + + /** + * Checks for the presence of a given message annotation and returns true + * if it is contained in the current annotations. If the annotations have + * not yet been initialized then this method always returns false. + * + * @param key + * the name of the annotation to query for. + * + * @return true if the annotation is present, false in not or annotations not initialized. + */ + boolean annotationExists(String key) { + if (annotationsMap == null) { + return false; + } + + return annotationsMap.containsKey(AmqpMessageSupport.getSymbol(key)); + } + + /** + * Given an annotation name, lookup and return the value associated with that + * annotation name. If the message annotations have not been created yet then + * this method will always return null. + * + * @param key + * the Symbol name that should be looked up in the message annotations. + * + * @return the value of the annotation if it exists, or null if not set or not accessible. + */ + Object getAnnotation(String key) { + if (annotationsMap == null) { + return null; + } + + return annotationsMap.get(AmqpMessageSupport.getSymbol(key)); + } + + /** + * Removes a message annotation if the message contains it. Will not do + * a lazy create on the message annotations so caller cannot count on the + * existence of the message annotations after a call to this method. + * + * @param key + * the annotation key that is to be removed from the current set. + */ + void removeAnnotation(String key) { + if (annotationsMap == null) { + return; + } + + annotationsMap.remove(AmqpMessageSupport.getSymbol(key)); + } + + /** + * Perform a proper annotation set on the AMQP Message based on a Symbol key and + * the target value to append to the current annotations. + * + * @param key + * The name of the Symbol whose value is being set. + * @param value + * The new value to set in the annotations of this message. + */ + void setAnnotation(String key, Object value) { + lazyCreateAnnotations(); + annotationsMap.put(AmqpMessageSupport.getSymbol(key), value); + } + + /** + * Removes all message annotations from this message. + */ + void clearAnnotations() { + annotationsMap = null; + annotations = null; + message.setMessageAnnotations(null); + } + + /** + * Removes all application level properties from the Message. + */ + void clearAllApplicationProperties() { + propertiesMap = null; + message.setApplicationProperties(null); + } + + private Long getAbsoluteExpiryTime() { + Long result = null; + if (message.getProperties() != null) { + Date date = message.getProperties().getAbsoluteExpiryTime(); + if (date != null) { + result = date.getTime(); + } + } + + return result; + } + + private void setAbsoluteExpiryTime(Long expiration) { + if (expiration == null) { + if (message.getProperties() != null) { + message.getProperties().setAbsoluteExpiryTime(null); + } + } else { + message.setExpiryTime(expiration); + } + } + + private void lazyCreateAnnotations() { + if (annotationsMap == null) { + annotationsMap = new HashMap<Symbol,Object>(); + annotations = new MessageAnnotations(annotationsMap); + message.setMessageAnnotations(annotations); + } + } + + private void lazyCreateProperties() { + propertiesMap = new HashMap<String,Object>(); + message.setApplicationProperties(new ApplicationProperties(propertiesMap)); + } +}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFactory.java new file mode 100644 index 0000000..882c2ac --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFactory.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.amqp.message; + +import java.io.IOException; +import java.io.Serializable; + +import javax.jms.JMSException; + +import org.apache.qpid.jms.exceptions.JmsExceptionSupport; +import org.apache.qpid.jms.message.JmsBytesMessage; +import org.apache.qpid.jms.message.JmsMapMessage; +import org.apache.qpid.jms.message.JmsMessage; +import org.apache.qpid.jms.message.JmsMessageFactory; +import org.apache.qpid.jms.message.JmsObjectMessage; +import org.apache.qpid.jms.message.JmsStreamMessage; +import org.apache.qpid.jms.message.JmsTextMessage; +import org.apache.qpid.jms.message.facade.JmsObjectMessageFacade; +import org.apache.qpid.jms.message.facade.JmsTextMessageFacade; +import org.apache.qpid.jms.message.facade.defaults.JmsDefaultBytesMessageFacade; +import org.apache.qpid.jms.message.facade.defaults.JmsDefaultMapMessageFacade; +import org.apache.qpid.jms.message.facade.defaults.JmsDefaultMessageFacade; +import org.apache.qpid.jms.message.facade.defaults.JmsDefaultObjectMessageFacade; +import org.apache.qpid.jms.message.facade.defaults.JmsDefaultStreamMessageFacade; +import org.apache.qpid.jms.message.facade.defaults.JmsDefaultTextMessageFacade; +import org.apache.qpid.jms.provider.amqp.AmqpConnection; + +/** + * AMQP Message Factory instance used to create new JmsMessage types that wrap an + * Proton AMQP Message. This class is used by the JMS layer to create its JMS + * Message instances, the messages returned here should be created in a proper + * initially empty state for the client to populate. + */ +public class AmqpJmsMessageFactory implements JmsMessageFactory { + + private AmqpConnection connection; + + public AmqpJmsMessageFactory() { + } + + public AmqpJmsMessageFactory(AmqpConnection connection) { + this.connection = connection; + } + + public AmqpConnection getAmqpConnection() { + return this.connection; + } + + public void setAmqpConnection(AmqpConnection connection) { + this.connection = connection; + } + + @Override + public JmsMessage createMessage() throws JMSException { + //return new JmsMessage(new AmqpJmsMessageFacade(connection)); + return new JmsMessage(new JmsDefaultMessageFacade()); + } + + @Override + public JmsTextMessage createTextMessage() throws JMSException { + return createTextMessage(null); + } + + @Override + public JmsTextMessage createTextMessage(String payload) throws JMSException { + + // JmsTextMessageFacade facade = new AmqpJmsTextMessageFacade(connection); + JmsTextMessageFacade facade = new JmsDefaultTextMessageFacade(); + + if (payload != null) { + facade.setText(payload); + } + + return new JmsTextMessage(facade); + } + + @Override + public JmsBytesMessage createBytesMessage() throws JMSException { + // return new JmsBytesMessage(new AmqpJmsBytesMessageFacade(connection)); + return new JmsBytesMessage(new JmsDefaultBytesMessageFacade()); + } + + @Override + public JmsMapMessage createMapMessage() throws JMSException { + // return new JmsMapMessage(new AmqpJmsMapMessageFacade(connection)); + return new JmsMapMessage(new JmsDefaultMapMessageFacade()); + } + + @Override + public JmsStreamMessage createStreamMessage() throws JMSException { + // return new JmsStreamMessage(new AmqpJmsStreamMessageFacade(connection)); + return new JmsStreamMessage(new JmsDefaultStreamMessageFacade()); + } + + @Override + public JmsObjectMessage createObjectMessage() throws JMSException { + return createObjectMessage(null); + } + + @Override + public JmsObjectMessage createObjectMessage(Serializable payload) throws JMSException { + + // JmsObjectMessageFacade facade = new AmqpJmsSerializedObjectMessageFacade(connection); + JmsObjectMessageFacade facade = new JmsDefaultObjectMessageFacade(); + + if (payload != null) { + try { + facade.setObject(payload); + } catch (IOException e) { + throw JmsExceptionSupport.create(e); + } + } + + return new JmsObjectMessage(facade); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessagePropertyIntercepter.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessagePropertyIntercepter.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessagePropertyIntercepter.java new file mode 100644 index 0000000..11d10a3 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessagePropertyIntercepter.java @@ -0,0 +1,377 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.amqp.message; + +import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_AMQP_REPLY_TO_GROUP_ID; +import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_AMQP_TTL; +import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_AMQP_TYPED_ENCODING; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import javax.jms.JMSException; +import javax.jms.MessageFormatException; + +import org.apache.qpid.jms.util.TypeConversionSupport; + +/** + * Utility class used to intercept calls to Message property sets and gets and map the + * correct AMQP fields to the property name being accessed. + */ +public class AmqpJmsMessagePropertyIntercepter { + + private static final Map<String, PropertyIntercepter> PROPERTY_INTERCEPTERS = new HashMap<String, PropertyIntercepter>(); + + /** + * Interface for a Property intercepter object used to write JMS style + * properties that are part of the JMS Message object members or perform + * some needed conversion action before some named property is read or + * written. If a property is not writable then the intercepter should + * throw an JMSException to indicate the error. + */ + interface PropertyIntercepter { + + /** + * Called when the names property is queried from an JMS Message object. + * + * @param message + * The message being acted upon. + * + * @return the correct property value from the given Message. + * + * @throws JMSException if an error occurs while accessing the property + */ + Object getProperty(AmqpJmsMessageFacade message) throws JMSException; + + /** + * Called when the names property is assigned from an JMS Message object. + * + * @param message + * The message instance being acted upon. + * @param value + * The value to assign to the intercepted property. + * + * @throws JMSException if an error occurs writing the property. + */ + void setProperty(AmqpJmsMessageFacade message, Object value) throws JMSException; + + /** + * Indicates if the intercepted property has a value currently assigned. + * + * @param message + * The message instance being acted upon. + * + * @return true if the intercepted property has a value assigned to it. + */ + boolean propertyExists(AmqpJmsMessageFacade message); + + } + + static { + PROPERTY_INTERCEPTERS.put(JMS_AMQP_TTL, new PropertyIntercepter() { + @Override + public Object getProperty(AmqpJmsMessageFacade message) throws JMSException { + return message.getAmqpTimeToLive(); + } + + @Override + public void setProperty(AmqpJmsMessageFacade message, Object value) throws JMSException { + Long rc = (Long) TypeConversionSupport.convert(value, Long.class); + if (rc == null) { + throw new JMSException("Property " + JMS_AMQP_TTL + " cannot be set from a " + value.getClass().getName() + "."); + } + message.setAmqpTimeToLive(rc.longValue()); + } + + @Override + public boolean propertyExists(AmqpJmsMessageFacade message) { + return message.getAmqpTimeToLive() != 0; + } + }); + PROPERTY_INTERCEPTERS.put(JMS_AMQP_REPLY_TO_GROUP_ID, new PropertyIntercepter() { + @Override + public Object getProperty(AmqpJmsMessageFacade message) throws JMSException { + return message.getReplyToGroupId(); + } + + @Override + public void setProperty(AmqpJmsMessageFacade message, Object value) throws JMSException { + String rc = (String) TypeConversionSupport.convert(value, String.class); + if (rc == null) { + throw new JMSException("Property " + JMS_AMQP_REPLY_TO_GROUP_ID + " cannot be set from a " + value.getClass().getName() + "."); + } + message.setReplyToGroupId(rc); + } + + @Override + public boolean propertyExists(AmqpJmsMessageFacade message) { + return message.getReplyToGroupId() != null; + } + }); + PROPERTY_INTERCEPTERS.put(JMS_AMQP_TYPED_ENCODING, new PropertyIntercepter() { + @Override + public Object getProperty(AmqpJmsMessageFacade message) throws JMSException { + if (message instanceof AmqpJmsObjectMessageFacade) { + return ((AmqpJmsObjectMessageFacade) message).isAmqpTypedEncoding(); + } + + return false; + } + + @Override + public void setProperty(AmqpJmsMessageFacade message, Object value) throws JMSException { + Integer rc = (Integer) TypeConversionSupport.convert(value, Boolean.class); + if (rc == null) { + throw new JMSException("Property " + JMS_AMQP_TYPED_ENCODING + " cannot be set from a " + value.getClass().getName() + "."); + } + + // TODO - Finished Typed encoding work. + if (message instanceof AmqpJmsObjectMessageFacade) { + // ((AmqpJmsSerializedObjectMessageFacade) message) + } else { + throw new MessageFormatException(JMS_AMQP_TYPED_ENCODING + " is only applicable to ObjectMessage"); + } + } + + @Override + public boolean propertyExists(AmqpJmsMessageFacade message) { + if (message instanceof AmqpJmsObjectMessageFacade) { + // TODO - See notes in AmqpObjectMessageFacade about whether this should + // always be exposed for ObjectMessage or only if it's currently + // the case that the message uses the AMQP typed encoding. + return ((AmqpJmsObjectMessageFacade) message).isAmqpTypedEncoding(); + } + + return false; + } + }); + } + + /** + * Static get method that takes a property name and gets the value either via + * a registered property get object or through the AmqpJmsMessageFacade getProperty + * method. + * + * @param message + * the AmqpJmsMessageFacade instance to read from + * @param name + * the property name that is being requested. + * + * @return the correct value either mapped to an attribute of a Message or a message property. + * + * @throws JMSException if an error occurs while reading the defined property. + */ + public static Object getProperty(AmqpJmsMessageFacade message, String name) throws JMSException { + Object value = null; + + PropertyIntercepter propertyExpression = PROPERTY_INTERCEPTERS.get(name); + if (propertyExpression != null) { + value = propertyExpression.getProperty(message); + } else { + value = message.getApplicationProperty(name); + } + + return value; + } + + /** + * Static set method that takes a property name and sets the value either via + * a registered property set object or through the AmqpJmsMessageFacade setProperty + * method. + * + * @param message + * the AmqpJmsMessageFacade instance to write to. + * @param name + * the property name that is being written. + * @param value + * the new value to assign for the named property. + * + * @throws JMSException if an error occurs while writing the defined property. + */ + public static void setProperty(AmqpJmsMessageFacade message, String name, Object value) throws JMSException { + PropertyIntercepter propertyExpression = PROPERTY_INTERCEPTERS.get(name); + if (propertyExpression != null) { + propertyExpression.setProperty(message, value); + } else { + message.setApplicationProperty(name, value); + } + } + + /** + * Static query method to determine if a specific property exists in the given message. + * + * @param message + * the AmqpJmsMessageFacade instance to write to. + * @param name + * the property name that is being checked. + * + * @throws JMSException if an error occurs while inspecting the defined property. + */ + public static void propertyExists(AmqpJmsMessageFacade message, String name) throws JMSException { + PropertyIntercepter propertyExpression = PROPERTY_INTERCEPTERS.get(name); + if (propertyExpression != null) { + propertyExpression.propertyExists(message); + } else { + message.applicationPropertyExists(name); + } + } + + /** + * For each of the currently configured message property intercepter instance a + * string key value is inserted into an Set and returned. + * + * @return a Set<String> containing the names of all intercepted properties. + */ + public static Set<String> getAllPropertyNames() { + return PROPERTY_INTERCEPTERS.keySet(); + } + + /** + * For each of the currently configured message property intercepter instance a + * string key value is inserted into an Set and returned if the property has a + * value and is available for a read operation. + * + * @return a Set<String> containing the names of all intercepted properties with a value. + */ + public static Set<String> getPropertyNames(AmqpJmsMessageFacade message) { + Set<String> names = new HashSet<String>(); + for (Entry<String, PropertyIntercepter> entry : PROPERTY_INTERCEPTERS.entrySet()) { + if (entry.getValue().propertyExists(message)) { + names.add(entry.getKey()); + } + } + return names; + } + + /** + * Allows for the additional PropertyIntercepter instances to be added to the global set. + * + * @param propertyName + * The name of the Message property that will be intercepted. + * @param getter + * The PropertyIntercepter instance that should be used for the named property. + */ + public static void addPropertyIntercepter(String propertyName, PropertyIntercepter getter) { + PROPERTY_INTERCEPTERS.put(propertyName, getter); + } + + /** + * Given a property name, remove the configured intercepter that has been assigned to + * intercept calls for that property value. + * + * @param propertyName + * The name of the PropertyIntercepter to remove. + * + * @return true if a getter was removed from the global set. + */ + public boolean removePropertyIntercepter(String propertyName) { + if (PROPERTY_INTERCEPTERS.remove(propertyName) != null) { + return true; + } + + return false; + } + + private final String name; + private final PropertyIntercepter propertyExpression; + + /** + * Creates an new property getter instance that is assigned to read the named value. + * + * @param name + * the property value that this getter is assigned to lookup. + */ + public AmqpJmsMessagePropertyIntercepter(String name) { + this.name = name; + this.propertyExpression = PROPERTY_INTERCEPTERS.get(name); + } + + /** + * Gets the correct property value from the JmsMessageFacade instance based on + * the predefined property mappings. + * + * @param message + * the JmsMessageFacade whose property is being read. + * + * @return the correct value either mapped to an Message attribute of a Message property. + * + * @throws JMSException if an error occurs while reading the defined property. + */ + public Object get(AmqpJmsMessageFacade message) throws JMSException { + if (propertyExpression != null) { + return propertyExpression.getProperty(message); + } + + return message.getApplicationProperty(name); + } + + /** + * Sets the correct property value from the AmqpJmsMessageFacade instance based on + * the predefined property mappings. + * + * @param message + * the AmqpJmsMessageFacade whose property is being read. + * @param value + * the value to be set on the intercepted AmqpJmsMessageFacade property. + * + * @throws JMSException if an error occurs while reading the defined property. + */ + public void set(AmqpJmsMessageFacade message, Object value) throws JMSException { + if (propertyExpression != null) { + propertyExpression.setProperty(message, value); + } else { + message.setApplicationProperty(name, value); + } + } + + /** + * @return the property name that is being intercepted for the AmqpJmsMessageFacade. + */ + public String getName() { + return name; + } + + /** + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + return name; + } + + /** + * @see java.lang.Object#hashCode() + */ + @Override + public int hashCode() { + return name.hashCode(); + } + + /** + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(Object o) { + if (o == null || !this.getClass().equals(o.getClass())) { + return false; + } + return name.equals(((AmqpJmsMessagePropertyIntercepter) o).name); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java new file mode 100644 index 0000000..3696653 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.amqp.message; + +import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MSG_TYPE; +import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_OBJECT_MESSAGE; + +import java.io.IOException; +import java.io.Serializable; + +import javax.jms.JMSException; + +import org.apache.qpid.jms.exceptions.JmsExceptionSupport; +import org.apache.qpid.jms.message.facade.JmsObjectMessageFacade; +import org.apache.qpid.jms.provider.amqp.AmqpConnection; +import org.apache.qpid.proton.message.Message; + +/** + * Wrapper around an AMQP Message instance that will be treated as a JMS ObjectMessage + * type. + */ +public class AmqpJmsObjectMessageFacade extends AmqpJmsMessageFacade implements JmsObjectMessageFacade { + + private AmqpObjectTypeDelegate delegate; + + /** + * @param connection + */ + public AmqpJmsObjectMessageFacade(AmqpConnection connection) { + super(connection); + setAnnotation(JMS_MSG_TYPE, JMS_OBJECT_MESSAGE); + + // TODO Implement Connection property to control default serialization type + initDelegate(false); + } + + /** + * @param connection + * @param message + */ + public AmqpJmsObjectMessageFacade(AmqpConnection connection, Message message) { + super(connection, message); + + // TODO detect the content type and init the proper delegate. + initDelegate(false); + } + + /** + * @return the appropriate byte value that indicates the type of message this is. + */ + @Override + public byte getJmsMsgType() { + return JMS_OBJECT_MESSAGE; + } + + @Override + public boolean isEmpty() { + // TODO - If null body changes to empty AmqpValue this needs to also change. + return getAmqpMessage().getBody() == null; + } + + public boolean isAmqpTypedEncoding() { + return this.delegate instanceof AmqpObjectTypeDelegate; + } + + @Override + public JmsObjectMessageFacade copy() throws JMSException { + AmqpJmsObjectMessageFacade copy = new AmqpJmsObjectMessageFacade(connection); + copyInto(copy); + + try { + copy.setObject(getObject()); + } catch (Exception e) { + throw JmsExceptionSupport.create("Failed to copy object value", e); + } + + return copy; + } + + @Override + public Serializable getObject() throws IOException, ClassNotFoundException { + return delegate.getObject(); + } + + @Override + public void setObject(Serializable value) throws IOException { + delegate.setObject(value); + } + + @Override + public void clearBody() { + try { + setObject(null); + } catch (IOException e) { + } + } + + @Override + public void onSend() { + // TODO instruct delegate to encode the proper content type into the message. + } + + private void initDelegate(boolean useAmqpTypes) { + if (!useAmqpTypes) { + delegate = new AmqpSerializedObjectDelegate(getAmqpMessage()); + } else { + delegate = new AmqpTypedObjectDelegate(getAmqpMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java new file mode 100644 index 0000000..0999225 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.amqp.message; + +import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MSG_TYPE; +import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_STREAM_MESSAGE; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import javax.jms.MessageEOFException; + +import org.apache.qpid.jms.message.facade.JmsStreamMessageFacade; +import org.apache.qpid.jms.provider.amqp.AmqpConnection; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.Section; +import org.apache.qpid.proton.message.Message; + +/** + * Wrapper around an AMQP Message instance that will be treated as a JMS StreamMessage + * type. + */ +public class AmqpJmsStreamMessageFacade extends AmqpJmsMessageFacade implements JmsStreamMessageFacade { + + private List<Object> list; + private int position = 0; + + /** + * Create a new facade ready for sending. + * + * @param connection + * the connection instance that created this facade. + */ + public AmqpJmsStreamMessageFacade(AmqpConnection connection) { + super(connection); + initializeEmptyList(); + setAnnotation(JMS_MSG_TYPE, JMS_STREAM_MESSAGE); + } + + /** + * Creates a new Facade around an incoming AMQP Message for dispatch to the + * JMS Consumer instance. + * + * @param connection + * the connection that created this Facade. + * @param message + * the incoming Message instance that is being wrapped. + */ + @SuppressWarnings("unchecked") + public AmqpJmsStreamMessageFacade(AmqpConnection connection, Message message) { + super(connection, message); + + Section body = getAmqpMessage().getBody(); + if (body == null) { + initializeEmptyList(); + } else if (body instanceof AmqpValue) { + Object value = ((AmqpValue) body).getValue(); + + if (value == null) { + initializeEmptyList(); + } else if (value instanceof List) { + list = (List<Object>) value; + } else { + throw new IllegalStateException("Unexpected amqp-value body content type: " + value.getClass().getSimpleName()); + } + } else { + throw new IllegalStateException("Unexpected message body type: " + body.getClass().getSimpleName()); + } + } + + @Override + public JmsStreamMessageFacade copy() { + AmqpJmsStreamMessageFacade copy = new AmqpJmsStreamMessageFacade(connection); + copyInto(copy); + copy.list.addAll(list); + return copy; + } + + /** + * @return the appropriate byte value that indicates the type of message this is. + */ + @Override + public byte getJmsMsgType() { + return JMS_STREAM_MESSAGE; + } + + @Override + public boolean hasNext() { + return !list.isEmpty() && position < list.size(); + } + + @Override + public Object peek() throws MessageEOFException { + if (list.isEmpty() || position >= list.size()) { + throw new MessageEOFException("Attempt to read past end of stream"); + } + + Object object = list.get(position); + if (object instanceof Binary) { + // Copy to a byte[], ensure we copy only the required portion. + Binary bin = ((Binary) object); + object = Arrays.copyOfRange(bin.getArray(), bin.getArrayOffset(), bin.getLength()); + } + + return object; + } + + @Override + public void pop() throws MessageEOFException { + if (list.isEmpty() || position >= list.size()) { + throw new MessageEOFException("Attempt to read past end of stream"); + } + + position++; + } + + @Override + public void put(Object value) { + Object entry = value; + if (entry instanceof byte[]) { + entry = new Binary((byte[]) value); + } + + list.add(entry); + } + + @Override + public void reset() { + position = 0; + } + + @Override + public void clearBody() { + list.clear(); + position = 0; + } + + @Override + public boolean isEmpty() { + return list.isEmpty(); + } + + private void initializeEmptyList() { + List<Object> list = new ArrayList<Object>(); + message.setBody(new AmqpValue(list)); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java new file mode 100644 index 0000000..6c2421b --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.amqp.message; + +import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MSG_TYPE; +import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_TEXT_MESSAGE; + +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; + +import javax.jms.JMSException; + +import org.apache.qpid.jms.exceptions.JmsExceptionSupport; +import org.apache.qpid.jms.message.facade.JmsTextMessageFacade; +import org.apache.qpid.jms.provider.amqp.AmqpConnection; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.amqp.messaging.Section; +import org.apache.qpid.proton.message.Message; + +/** + * Wrapper around an AMQP Message instance that will be treated as a JMS TextMessage + * type. + */ +public class AmqpJmsTextMessageFacade extends AmqpJmsMessageFacade implements JmsTextMessageFacade { + + private static final String UTF_8 = "UTF-8"; + + /** + * Content type, only to be used when message uses a data + * body section, and not when using an amqp-value body section + */ + public static final String CONTENT_TYPE = "text/plain"; + + private final CharsetDecoder decoder = Charset.forName(UTF_8).newDecoder(); + + /** + * Create a new AMQP Message facade ready for sending. + * + * @param connection + * The AMQP Connection that created this message. + */ + public AmqpJmsTextMessageFacade(AmqpConnection connection) { + super(connection); + setAnnotation(JMS_MSG_TYPE, JMS_TEXT_MESSAGE); + setText(null); + } + + /** + * Creates a new Facade around an incoming AMQP Message for dispatch to the + * JMS Consumer instance. + * + * @param connection + * the connection that created this Facade. + * @param message + * the incoming Message instance that is being wrapped. + */ + public AmqpJmsTextMessageFacade(AmqpConnection connection, Message message) { + super(connection, message); + } + + /** + * @return the appropriate byte value that indicates the type of message this is. + */ + @Override + public byte getJmsMsgType() { + return JMS_TEXT_MESSAGE; + } + + @Override + public JmsTextMessageFacade copy() throws JMSException { + AmqpJmsTextMessageFacade copy = new AmqpJmsTextMessageFacade(connection); + copyInto(copy); + copy.setText(getText()); + return copy; + } + + @Override + public String getText() throws JMSException { + Section body = getAmqpMessage().getBody(); + + if (body == null) { + return null; + } else if (body instanceof Data) { + Data data = (Data) body; + if (data.getValue() == null || data.getValue().getLength() == 0) { + return ""; + } else { + Binary b = data.getValue(); + ByteBuffer buf = ByteBuffer.wrap(b.getArray(), b.getArrayOffset(), b.getLength()); + + try { + CharBuffer chars = decoder.decode(buf); + return String.valueOf(chars); + } catch (CharacterCodingException e) { + throw JmsExceptionSupport.create("Cannot decode String in UFT-8", e); + } + } + } else if (body instanceof AmqpValue) { + Object value = ((AmqpValue) body).getValue(); + + if (value == null || value instanceof String) { + return (String) value; + } else { + throw new IllegalStateException("Unexpected amqp-value body content type: " + value.getClass().getSimpleName()); + } + } else { + throw new IllegalStateException("Unexpected message body type: " + body.getClass().getSimpleName()); + } + } + + @Override + public void setText(String value) { + AmqpValue body = new AmqpValue(value); + getAmqpMessage().setBody(body); + } + + @Override + public void clearBody() { + setText(null); + } + + @Override + public boolean isEmpty() { + Section body = getAmqpMessage().getBody(); + + if (body == null) { + return true; + } else if (body instanceof Data) { + Data data = (Data) body; + if (data.getValue() == null || data.getValue().getLength() == 0) { + return true; + } + } + + return false; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageIdHelper.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageIdHelper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageIdHelper.java new file mode 100644 index 0000000..0bda795 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageIdHelper.java @@ -0,0 +1,270 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.jms.provider.amqp.message; + +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.UUID; + +import org.apache.qpid.jms.exceptions.IdConversionException; + +/** + * Helper class for identifying and converting message-id and correlation-id values between + * the AMQP types and the Strings values used by JMS. + * + * <p>AMQP messages allow for 4 types of message-id/correlation-id: message-id-string, message-id-binary, + * message-id-uuid, or message-id-ulong. In order to accept or return a string representation of these + * for interoperability with other AMQP clients, the following encoding can be used after removing or + * before adding the "ID:" prefix used for a JMSMessageID value<br/> + * + * "AMQP_BINARY:<hex representation of binary content>"<br/> + * "AMQP_UUID:<string representation of uuid>"<br/> + * "AMQP_ULONG:<string representation of ulong>"<br/> + * "AMQP_STRING:<string>"<br/> + * + * <p>The AMQP_STRING encoding exists only for escaping message-id-string values that happen to begin + * with one of the encoding prefixes (including AMQP_STRING itself). It MUST NOT be used otherwise. + * + * <p>When provided a string for conversion which attempts to identify itself as an encoded binary, uuid, or + * ulong but can't be converted into the indicated format, an exception will be thrown. + * + */ +public class AmqpMessageIdHelper { + public static final String AMQP_STRING_PREFIX = "AMQP_STRING:"; + public static final String AMQP_UUID_PREFIX = "AMQP_UUID:"; + public static final String AMQP_ULONG_PREFIX = "AMQP_ULONG:"; + public static final String AMQP_BINARY_PREFIX = "AMQP_BINARY:"; + public static final String JMS_ID_PREFIX = "ID:"; + + private static final int JMS_ID_PREFIX_LENGTH = JMS_ID_PREFIX.length(); + private static final int AMQP_UUID_PREFIX_LENGTH = AMQP_UUID_PREFIX.length(); + private static final int AMQP_ULONG_PREFIX_LENGTH = AMQP_ULONG_PREFIX.length(); + private static final int AMQP_STRING_PREFIX_LENGTH = AMQP_STRING_PREFIX.length(); + private static final int AMQP_BINARY_PREFIX_LENGTH = AMQP_BINARY_PREFIX.length(); + private static final char[] HEX_CHARS = "0123456789ABCDEF".toCharArray(); + + /** + * Checks whether the given string begins with "ID:" prefix used to denote a JMSMessageID + * + * @param string the string to check + * @return true if and only id the string begins with "ID:" + */ + public boolean hasMessageIdPrefix(String string) { + if (string == null) { + return false; + } + + return string.startsWith(JMS_ID_PREFIX); + } + + /** + * Returns the suffix of the given string after removing the first "ID:" prefix (if present). + * + * @param string the string to process + * @return the suffix, or the original String if the "ID:" prefix is not present + */ + public String stripMessageIdPrefix(String id) { + if (hasMessageIdPrefix(id)) { + return strip(id, JMS_ID_PREFIX_LENGTH); + } else { + return id; + } + } + + private String strip(String id, int numChars) { + return id.substring(numChars); + } + + /** + * Takes the provided amqp messageId style object, and convert it to a base string. + * Encodes type information as a prefix where necessary to convey or escape the type + * of the provided object. + * + * @param messageId the object to process + * @return the base string to be used in creating the actual JMS id. + */ + public String toBaseMessageIdString(Object messageId) { + if (messageId == null) { + return null; + } else if (messageId instanceof String) { + String stringId = (String) messageId; + + // If the given string has a type encoding prefix, + // we need to escape it as an encoded string (even if + // the existing encoding prefix was also for string) + if (hasTypeEncodingPrefix(stringId)) { + return AMQP_STRING_PREFIX + stringId; + } else { + return stringId; + } + } else if (messageId instanceof UUID) { + return AMQP_UUID_PREFIX + messageId.toString(); + } else if (messageId instanceof BigInteger || messageId instanceof Long) { + return AMQP_ULONG_PREFIX + messageId.toString(); + } else if (messageId instanceof ByteBuffer) { + ByteBuffer dup = ((ByteBuffer) messageId).duplicate(); + + byte[] bytes = new byte[dup.remaining()]; + dup.get(bytes); + + String hex = convertBinaryToHexString(bytes); + + return AMQP_BINARY_PREFIX + hex; + } else { + throw new IllegalArgumentException("Unsupported type provided: " + messageId.getClass()); + } + } + + private boolean hasTypeEncodingPrefix(String stringId) { + return hasAmqpBinaryPrefix(stringId) || + hasAmqpUuidPrefix(stringId) || + hasAmqpUlongPrefix(stringId) || + hasAmqpStringPrefix(stringId); + } + + private boolean hasAmqpStringPrefix(String stringId) { + return stringId.startsWith(AMQP_STRING_PREFIX); + } + + private boolean hasAmqpUlongPrefix(String stringId) { + return stringId.startsWith(AMQP_ULONG_PREFIX); + } + + private boolean hasAmqpUuidPrefix(String stringId) { + return stringId.startsWith(AMQP_UUID_PREFIX); + } + + private boolean hasAmqpBinaryPrefix(String stringId) { + return stringId.startsWith(AMQP_BINARY_PREFIX); + } + + /** + * Takes the provided base id string and return the appropriate amqp messageId style object. + * Converts the type based on any relevant encoding information found as a prefix. + * + * @param baseId the object to be converted + * @return the amqp messageId style object + * @throws IdConversionException if the provided baseId String indicates an encoded type but can't be converted to that type. + */ + public Object toIdObject(String baseId) throws IdConversionException { + if (baseId == null) { + return null; + } + + try { + if (hasAmqpUuidPrefix(baseId)) { + String uuidString = strip(baseId, AMQP_UUID_PREFIX_LENGTH); + return UUID.fromString(uuidString); + } else if (hasAmqpUlongPrefix(baseId)) { + String longString = strip(baseId, AMQP_ULONG_PREFIX_LENGTH); + return new BigInteger(longString); + } else if (hasAmqpStringPrefix(baseId)) { + return strip(baseId, AMQP_STRING_PREFIX_LENGTH); + } else if (hasAmqpBinaryPrefix(baseId)) { + String hexString = strip(baseId, AMQP_BINARY_PREFIX_LENGTH); + byte[] bytes = convertHexStringToBinary(hexString); + return ByteBuffer.wrap(bytes); + } else { + // We have a string without any type prefix, transmit it as-is. + return baseId; + } + } catch (IllegalArgumentException e) { + throw new IdConversionException("Unable to convert ID value", e); + } + } + + /** + * Convert the provided hex-string into a binary representation where each byte represents + * two characters of the hex string. + * + * The hex characters may be upper or lower case. + * + * @param hexString string to convert + * @return a byte array containing the binary representation + * @throws IllegalArgumentException if the provided String is a non-even length or contains non-hex characters + */ + public byte[] convertHexStringToBinary(String hexString) throws IllegalArgumentException { + int length = hexString.length(); + + // As each byte needs two characters in the hex encoding, the string must be an even length. + if (length % 2 != 0) { + throw new IllegalArgumentException("The provided hex String must be an even length, but was of length " + length + ": " + hexString); + } + + byte[] binary = new byte[length / 2]; + + for (int i = 0; i < length; i += 2) { + char highBitsChar = hexString.charAt(i); + char lowBitsChar = hexString.charAt(i + 1); + + int highBits = hexCharToInt(highBitsChar, hexString) << 4; + int lowBits = hexCharToInt(lowBitsChar, hexString); + + binary[i / 2] = (byte) (highBits + lowBits); + } + + return binary; + } + + private int hexCharToInt(char ch, String orig) throws IllegalArgumentException { + if (ch >= '0' && ch <= '9') { + // subtract '0' to get difference in position as an int + return ch - '0'; + } else if (ch >= 'A' && ch <= 'F') { + // subtract 'A' to get difference in position as an int + // and then add 10 for the offset of 'A' + return ch - 'A' + 10; + } else if (ch >= 'a' && ch <= 'f') { + // subtract 'a' to get difference in position as an int + // and then add 10 for the offset of 'a' + return ch - 'a' + 10; + } + + throw new IllegalArgumentException("The provided hex string contains non-hex character '" + ch + "': " + orig); + } + + /** + * Convert the provided binary into a hex-string representation where each character + * represents 4 bits of the provided binary, i.e each byte requires two characters. + * + * The returned hex characters are upper-case. + * + * @param bytes binary to convert + * @return a String containing a hex representation of the bytes + */ + public String convertBinaryToHexString(byte[] bytes) { + // Each byte is represented as 2 chars + StringBuilder builder = new StringBuilder(bytes.length * 2); + + for (byte b : bytes) { + // The byte will be expanded to int before shifting, replicating the + // sign bit, so mask everything beyond the first 4 bits afterwards + int highBitsInt = (b >> 4) & 0xF; + // We only want the first 4 bits + int lowBitsInt = b & 0xF; + + builder.append(HEX_CHARS[highBitsInt]); + builder.append(HEX_CHARS[lowBitsInt]); + } + + return builder.toString(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java new file mode 100644 index 0000000..a01d415 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.amqp.message; + +import javax.jms.Destination; +import javax.jms.Queue; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.Topic; + +import org.apache.qpid.proton.amqp.Symbol; + +/** + * Support class containing constant values and static methods that are + * used to map to / from AMQP Message types being sent or received. + */ +public final class AmqpMessageSupport { + + /** + * The Annotation name to store the destination name that the Message + * will be sent to. The Message should also be tagged with the appropriate + * destination attribute to allow the receiver to determine the correct + * destination type. + */ + public static final String AMQP_TO_ANNOTATION = "x-opt-to-type"; + + /** + * The Annotation name to store the destination name that the sender wants + * to receive replies on. The Message should also be tagged with the + * appropriate destination attribute to allow the receiver to determine the + * correct destination type. + */ + public static final String AMQP_REPLY_TO_ANNOTATION = "x-opt-reply-type"; + + /** + * Attribute used to mark a destination as temporary. + */ + public static final String TEMPORARY_ATTRIBUTE = "temporary"; + + /** + * Attribute used to mark a destination as being a Queue type. + */ + public static final String QUEUE_ATTRIBUTES = "queue"; + + /** + * Attribute used to mark a destination as being a Topic type. + */ + public static final String TOPIC_ATTRIBUTES = "topic"; + + /** + * Convenience value used to mark a destination as a Temporary Queue. + */ + public static final String TEMP_QUEUE_ATTRIBUTES = TEMPORARY_ATTRIBUTE + "," + QUEUE_ATTRIBUTES; + + /** + * Convenience value used to mark a destination as a Temporary Topic. + */ + public static final String TEMP_TOPIC_ATTRIBUTES = TEMPORARY_ATTRIBUTE + "," + TOPIC_ATTRIBUTES; + + /** + * Attribute used to mark the Application defined correlation Id that has been + * set for the message. + */ + public static final String JMS_APP_CORRELATION_ID = "x-opt-app-correlation-id"; + + /** + * Attribute used to mark the JMSType value set on the message. + */ + public static final String JMS_TYPE = "x-opt-jms-type"; + + /** + * Attribute used to mark the JMS Type that the message instance represents. + */ + public static final String JMS_MSG_TYPE = "x-opt-jms-msg-type"; + + /** + * Value mapping for JMS_MSG_TYPE which indicates the message is a generic JMS Message + * which has no body. + */ + public static final byte JMS_MESSAGE = 0; + + /** + * Value mapping for JMS_MSG_TYPE which indicates the message is a JMS ObjectMessage + * which has an Object value serialized in its message body. + */ + public static final byte JMS_OBJECT_MESSAGE = 1; + + /** + * Value mapping for JMS_MSG_TYPE which indicates the message is a JMS MapMessage + * which has an Map instance serialized in its message body. + */ + public static final byte JMS_MAP_MESSAGE = 2; + + /** + * Value mapping for JMS_MSG_TYPE which indicates the message is a JMS BytesMessage + * which has a body that consists of raw bytes. + */ + public static final byte JMS_BYTES_MESSAGE = 3; + + /** + * Value mapping for JMS_MSG_TYPE which indicates the message is a JMS StreamMessage + * which has a body that is a structured collection of primitives values. + */ + public static final byte JMS_STREAM_MESSAGE = 4; + + /** + * Value mapping for JMS_MSG_TYPE which indicates the message is a JMS TextMessage + * which has a body that contains a UTF-8 encoded String. + */ + public static final byte JMS_TEXT_MESSAGE = 5; + + public static final String JMS_AMQP_TTL = "JMS_AMQP_TTL"; + public static final String JMS_AMQP_REPLY_TO_GROUP_ID = "JMS_AMQP_REPLY_TO_GROUP_ID"; + public static final String JMS_AMQP_TYPED_ENCODING = "JMS_AMQP_TYPED_ENCODING"; + + /** + * Lookup and return the correct Proton Symbol instance based on the given key. + * + * @param key + * the String value name of the Symbol to locate. + * + * @return the Symbol value that matches the given key. + */ + public static Symbol getSymbol(String key) { + return Symbol.valueOf(key); + } + + /** + * Given a JMS Destination object return the correct message annotations that + * will identify the type of Destination the name represents, Queue. Topic, etc. + * + * @param destination + * The JMS Destination to be examined. + * + * @return the correct message annotation values to describe the given Destination. + */ + public static String destinationAttributes(Destination destination) { + if (destination instanceof Queue) { + if (destination instanceof TemporaryQueue) { + return TEMP_QUEUE_ATTRIBUTES; + } else { + return QUEUE_ATTRIBUTES; + } + } + if (destination instanceof Topic) { + if (destination instanceof TemporaryTopic) { + return TEMP_TOPIC_ATTRIBUTES; + } else { + return TOPIC_ATTRIBUTES; + } + } + + return null; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpObjectTypeDelegate.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpObjectTypeDelegate.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpObjectTypeDelegate.java new file mode 100644 index 0000000..cfa6237 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpObjectTypeDelegate.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.amqp.message; + +import java.io.IOException; +import java.io.Serializable; + +/** + * Interface for a Delegate object that handles storing and retrieving the Object + * value in an Object message. + */ +public interface AmqpObjectTypeDelegate { + + /** + * Given a serializable instance, store the value into the AMQP message using + * the strategy implemented by this delegate. + * + * @param value + * A serializable object instance to be stored in the message. + * + * @throws IOException if an error occurs during the store operation. + */ + void setObject(Serializable value) throws IOException; + + /** + * Read a Serialized object from the AMQP message using the strategy implemented + * by this delegate. + * + * @return an Object that has been read from the stored object data in the message. + * + * @throws IOException if an error occurs while reading the stored object. + * @throws ClassNotFoundException if no class can be found for the stored type. + */ + Serializable getObject() throws IOException, ClassNotFoundException; + +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java new file mode 100644 index 0000000..72db9dc --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.amqp.message; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +import org.apache.qpid.jms.util.ClassLoadingAwareObjectInputStream; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.amqp.messaging.Section; +import org.apache.qpid.proton.message.Message; + +/** + * Wrapper around an AMQP Message instance that will be treated as a JMS ObjectMessage + * type. + */ +public class AmqpSerializedObjectDelegate implements AmqpObjectTypeDelegate { + + public static final String CONTENT_TYPE = "application/x-java-serialized-object"; + + private final Message message; + + /** + * Create a new delegate that uses Java serialization to store the message content. + * + * @param message + * the AMQP message instance where the object is to be stored / read. + */ + public AmqpSerializedObjectDelegate(Message message) { + this.message = message; + this.message.setContentType(CONTENT_TYPE); + } + + @Override + public Serializable getObject() throws IOException, ClassNotFoundException { + Binary bin = null; + + Section body = message.getBody(); + if (body == null) { + return null; + } else if (body instanceof Data) { + bin = ((Data) body).getValue(); + } else { + throw new IllegalStateException("Unexpected body type: " + body.getClass().getSimpleName()); + } + + if (bin == null) { + return null; + } else { + Serializable serialized = null; + + try (ByteArrayInputStream bais = new ByteArrayInputStream(bin.getArray(), bin.getArrayOffset(), bin.getLength()); + ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(bais)) { + + serialized = (Serializable) objIn.readObject(); + } + + return serialized; + } + } + + @Override + public void setObject(Serializable value) throws IOException { + if(value == null) { + // TODO: verify whether not sending a body is ok, + // send a serialized null instead if it isn't + message.setBody(null); + } else { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos)) { + + oos.writeObject(value); + oos.flush(); + oos.close(); + + byte[] bytes = baos.toByteArray(); + message.setBody(new Data(new Binary(bytes))); + } + } + + // TODO: ensure content type is [still] set? + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
