http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessage.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessage.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessage.java new file mode 100644 index 0000000..c955f41 --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessage.java @@ -0,0 +1,1089 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.InvalidDestinationException; +import javax.jms.JMSException; +import javax.jms.JMSRuntimeException; +import javax.jms.Message; +import javax.jms.MessageFormatException; +import javax.jms.MessageNotWriteableException; + +import org.apache.activemq.api.core.ActiveMQBuffer; +import org.apache.activemq.api.core.ActiveMQException; +import org.apache.activemq.api.core.ActiveMQPropertyConversionException; +import org.apache.activemq.api.core.SimpleString; +import org.apache.activemq.api.core.client.ClientMessage; +import org.apache.activemq.api.core.client.ClientSession; +import org.apache.activemq.api.jms.ActiveMQJMSConstants; +import org.apache.activemq.core.message.impl.MessageInternal; +import org.apache.activemq.reader.MessageUtil; +import org.apache.activemq.utils.UUID; + + +/** + * ActiveMQ implementation of a JMS Message. + * <br> + * JMS Messages only live on the client side - the server only deals with MessageImpl + * instances + * + * @author <a href="mailto:ovi...@feodorov.com">Ovidiu Feodorov</a> + * @author <a href="mailto:tim....@jboss.com">Tim Fox</a> + * @author <a href="mailto:bersh...@yahoo.com">Tyronne Wickramarathne</a> Partially ported from JBossMQ implementation + * originally written by: + * @author Norbert Lataille (norbert.latai...@m4x.org) + * @author Hiram Chirino (cojonud...@hotmail.com) + * @author David Maplesden (david.maples...@orion.co.nz) + * @author <a href="mailto:adr...@jboss.org">Adrian Brock</a> + * @author <a href="mailto:atay...@redhat.com">Andy Taylor</a> + * @author <a href="mailto:clebert.suco...@jboss.org">Clebert Suconic</a> + */ +public class ActiveMQMessage implements javax.jms.Message +{ + // Constants ----------------------------------------------------- + public static final byte TYPE = org.apache.activemq.api.core.Message.DEFAULT_TYPE; + + public static Map<String, Object> coreMaptoJMSMap(final Map<String, Object> coreMessage) + { + Map<String, Object> jmsMessage = new HashMap<String, Object>(); + + String deliveryMode = (Boolean)coreMessage.get("durable") ? "PERSISTENT" : "NON_PERSISTENT"; + byte priority = (Byte)coreMessage.get("priority"); + long timestamp = (Long)coreMessage.get("timestamp"); + long expiration = (Long)coreMessage.get("expiration"); + + jmsMessage.put("JMSPriority", priority); + jmsMessage.put("JMSTimestamp", timestamp); + jmsMessage.put("JMSExpiration", expiration); + jmsMessage.put("JMSDeliveryMode", deliveryMode); + + for (Map.Entry<String, Object> entry : coreMessage.entrySet()) + { + if (entry.getKey().equals("type") || entry.getKey().equals("durable") || + entry.getKey().equals("expiration") || + entry.getKey().equals("timestamp") || + entry.getKey().equals("priority")) + { + // Ignore + } + else if (entry.getKey().equals("userID")) + { + jmsMessage.put("JMSMessageID", entry.getValue().toString()); + } + else + { + Object value = entry.getValue(); + if (value instanceof SimpleString) + { + jmsMessage.put(entry.getKey(), value.toString()); + } + else + { + jmsMessage.put(entry.getKey(), value); + } + } + } + + return jmsMessage; + } + + // Static -------------------------------------------------------- + + private static final HashSet<String> reservedIdentifiers = new HashSet<String>(); + static + { + ActiveMQMessage.reservedIdentifiers.add("NULL"); + ActiveMQMessage.reservedIdentifiers.add("TRUE"); + ActiveMQMessage.reservedIdentifiers.add("FALSE"); + ActiveMQMessage.reservedIdentifiers.add("NOT"); + ActiveMQMessage.reservedIdentifiers.add("AND"); + ActiveMQMessage.reservedIdentifiers.add("OR"); + ActiveMQMessage.reservedIdentifiers.add("BETWEEN"); + ActiveMQMessage.reservedIdentifiers.add("LIKE"); + ActiveMQMessage.reservedIdentifiers.add("IN"); + ActiveMQMessage.reservedIdentifiers.add("IS"); + ActiveMQMessage.reservedIdentifiers.add("ESCAPE"); + } + + public static ActiveMQMessage createMessage(final ClientMessage message, final ClientSession session) + { + int type = message.getType(); + + ActiveMQMessage msg; + + switch (type) + { + case ActiveMQMessage.TYPE: // 0 + { + msg = new ActiveMQMessage(message, session); + break; + } + case ActiveMQBytesMessage.TYPE: // 4 + { + msg = new ActiveMQBytesMessage(message, session); + break; + } + case ActiveMQMapMessage.TYPE: // 5 + { + msg = new ActiveMQMapMessage(message, session); + break; + } + case ActiveMQObjectMessage.TYPE: + { + msg = new ActiveMQObjectMessage(message, session); + break; + } + case ActiveMQStreamMessage.TYPE: // 6 + { + msg = new ActiveMQStreamMessage(message, session); + break; + } + case ActiveMQTextMessage.TYPE: // 3 + { + msg = new ActiveMQTextMessage(message, session); + break; + } + default: + { + throw new JMSRuntimeException("Invalid message type " + type); + } + } + + return msg; + } + + // Attributes ---------------------------------------------------- + + // The underlying message + protected ClientMessage message; + + private ClientSession session; + + // Read-only? + protected boolean readOnly; + + // Properties read-only? + protected boolean propertiesReadOnly; + + // Cache it + private Destination dest; + + // Cache it + private String msgID; + + // Cache it + private Destination replyTo; + + // Cache it + private String jmsCorrelationID; + + // Cache it + private String jmsType; + + private boolean individualAck; + + private long jmsDeliveryTime; + + // Constructors -------------------------------------------------- + + /* + * Create a new message prior to sending + */ + protected ActiveMQMessage(final byte type, final ClientSession session) + { + message = session.createMessage(type, true, 0, System.currentTimeMillis(), (byte)4); + + } + + protected ActiveMQMessage(final ClientSession session) + { + this(ActiveMQMessage.TYPE, session); + } + + /** + * Constructor for when receiving a message from the server + */ + public ActiveMQMessage(final ClientMessage message, final ClientSession session) + { + this.message = message; + + readOnly = true; + + propertiesReadOnly = true; + + this.session = session; + } + + /* + * A constructor that takes a foreign message + */ + public ActiveMQMessage(final Message foreign, final ClientSession session) throws JMSException + { + this(foreign, ActiveMQMessage.TYPE, session); + } + + public ActiveMQMessage() + { + } + + protected ActiveMQMessage(final Message foreign, final byte type, final ClientSession session) throws JMSException + { + this(type, session); + + setJMSTimestamp(foreign.getJMSTimestamp()); + + String value = System.getProperty(ActiveMQJMSConstants.JMS_ACTIVEMQ_ENABLE_BYTE_ARRAY_JMS_CORRELATION_ID_PROPERTY_NAME); + + boolean supportBytesId = !"false".equals(value); + + if (supportBytesId) + { + try + { + byte[] corrIDBytes = foreign.getJMSCorrelationIDAsBytes(); + setJMSCorrelationIDAsBytes(corrIDBytes); + } + catch (JMSException e) + { + // specified as String + String corrIDString = foreign.getJMSCorrelationID(); + if (corrIDString != null) + { + setJMSCorrelationID(corrIDString); + } + } + } + else + { + // Some providers, like WSMQ do automatic conversions between native byte[] correlation id + // and String correlation id. This makes it impossible for ActiveMQ to guarantee to return the correct + // type as set by the user + // So we allow the behaviour to be overridden by a system property + // https://jira.jboss.org/jira/browse/HORNETQ-356 + // https://jira.jboss.org/jira/browse/HORNETQ-332 + String corrIDString = foreign.getJMSCorrelationID(); + if (corrIDString != null) + { + setJMSCorrelationID(corrIDString); + } + } + + setJMSReplyTo(foreign.getJMSReplyTo()); + setJMSDestination(foreign.getJMSDestination()); + setJMSDeliveryMode(foreign.getJMSDeliveryMode()); + setJMSExpiration(foreign.getJMSExpiration()); + setJMSPriority(foreign.getJMSPriority()); + setJMSType(foreign.getJMSType()); + + // We can't avoid a cast warning here since getPropertyNames() is on the JMS API + for (Enumeration<String> props = foreign.getPropertyNames(); props.hasMoreElements();) + { + String name = props.nextElement(); + + Object prop = foreign.getObjectProperty(name); + + setObjectProperty(name, prop); + } + } + + // javax.jmx.Message implementation ------------------------------ + + public String getJMSMessageID() + { + if (msgID == null) + { + UUID uid = message.getUserID(); + + msgID = uid == null ? null : "ID:" + uid.toString(); + } + return msgID; + } + + public void setJMSMessageID(final String jmsMessageID) throws JMSException + { + if (jmsMessageID != null && !jmsMessageID.startsWith("ID:")) + { + throw new JMSException("JMSMessageID must start with ID:"); + } + + message.setUserID(null); + + msgID = jmsMessageID; + } + + public long getJMSTimestamp() throws JMSException + { + return message.getTimestamp(); + } + + public void setJMSTimestamp(final long timestamp) throws JMSException + { + message.setTimestamp(timestamp); + } + + public byte[] getJMSCorrelationIDAsBytes() throws JMSException + { + return MessageUtil.getJMSCorrelationIDAsBytes(message); + } + + public void setJMSCorrelationIDAsBytes(final byte[] correlationID) throws JMSException + { + try + { + MessageUtil.setJMSCorrelationIDAsBytes(message, correlationID); + } + catch (ActiveMQException e) + { + JMSException ex = new JMSException(e.getMessage()); + ex.initCause(e); + throw ex; + } + } + + public void setJMSCorrelationID(final String correlationID) throws JMSException + { + MessageUtil.setJMSCorrelationID(message, correlationID); + jmsCorrelationID = correlationID; + } + + public String getJMSCorrelationID() throws JMSException + { + if (jmsCorrelationID == null) + { + jmsCorrelationID = MessageUtil.getJMSCorrelationID(message); + } + + return jmsCorrelationID; + } + + public Destination getJMSReplyTo() throws JMSException + { + if (replyTo == null) + { + + SimpleString repl = MessageUtil.getJMSReplyTo(message); + + if (repl != null) + { + replyTo = ActiveMQDestination.fromAddress(repl.toString()); + } + } + return replyTo; + } + + public void setJMSReplyTo(final Destination dest) throws JMSException + { + + if (dest == null) + { + MessageUtil.setJMSReplyTo(message, null); + replyTo = null; + } + else + { + if (dest instanceof ActiveMQDestination == false) + { + throw new InvalidDestinationException("Not a ActiveMQ destination " + dest); + } + + ActiveMQDestination jbd = (ActiveMQDestination)dest; + + MessageUtil.setJMSReplyTo(message, jbd.getSimpleAddress()); + + replyTo = jbd; + } + } + + public Destination getJMSDestination() throws JMSException + { + if (dest == null) + { + SimpleString sdest = message.getAddress(); + + dest = sdest == null ? null : ActiveMQDestination.fromAddress(sdest.toString()); + } + + return dest; + } + + public void setJMSDestination(final Destination destination) throws JMSException + { + dest = destination; + } + + public int getJMSDeliveryMode() throws JMSException + { + return message.isDurable() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT; + } + + public void setJMSDeliveryMode(final int deliveryMode) throws JMSException + { + if (deliveryMode == DeliveryMode.PERSISTENT) + { + message.setDurable(true); + } + else if (deliveryMode == DeliveryMode.NON_PERSISTENT) + { + message.setDurable(false); + } + else + { + throw ActiveMQJMSClientBundle.BUNDLE.illegalDeliveryMode(deliveryMode); + } + } + + public boolean getJMSRedelivered() throws JMSException + { + return message.getDeliveryCount() > 1; + } + + public void setJMSRedelivered(final boolean redelivered) throws JMSException + { + if (!redelivered) + { + message.setDeliveryCount(1); + } + else + { + if (message.getDeliveryCount() > 1) + { + // do nothing + } + else + { + message.setDeliveryCount(2); + } + } + } + + public void setJMSType(final String type) throws JMSException + { + if (type != null) + { + MessageUtil.setJMSType(message, type); + + jmsType = type; + } + } + + public String getJMSType() throws JMSException + { + if (jmsType == null) + { + jmsType = MessageUtil.getJMSType(message); + } + return jmsType; + } + + public long getJMSExpiration() throws JMSException + { + return message.getExpiration(); + } + + public void setJMSExpiration(final long expiration) throws JMSException + { + message.setExpiration(expiration); + } + + public int getJMSPriority() throws JMSException + { + return message.getPriority(); + } + + public void setJMSPriority(final int priority) throws JMSException + { + checkPriority(priority); + + message.setPriority((byte)priority); + } + + public void clearProperties() throws JMSException + { + + MessageUtil.clearProperties(message); + + propertiesReadOnly = false; + } + + public void clearBody() throws JMSException + { + readOnly = false; + } + + public boolean propertyExists(final String name) throws JMSException + { + return MessageUtil.propertyExists(message, name); + } + + public boolean getBooleanProperty(final String name) throws JMSException + { + try + { + return message.getBooleanProperty(new SimpleString(name)); + } + catch (ActiveMQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public byte getByteProperty(final String name) throws JMSException + { + try + { + return message.getByteProperty(new SimpleString(name)); + } + catch (ActiveMQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public short getShortProperty(final String name) throws JMSException + { + try + { + return message.getShortProperty(new SimpleString(name)); + } + catch (ActiveMQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public int getIntProperty(final String name) throws JMSException + { + if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) + { + return message.getDeliveryCount(); + } + + try + { + return message.getIntProperty(new SimpleString(name)); + } + catch (ActiveMQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public long getLongProperty(final String name) throws JMSException + { + if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) + { + return message.getDeliveryCount(); + } + + try + { + return message.getLongProperty(new SimpleString(name)); + } + catch (ActiveMQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public float getFloatProperty(final String name) throws JMSException + { + try + { + return message.getFloatProperty(new SimpleString(name)); + } + catch (ActiveMQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public double getDoubleProperty(final String name) throws JMSException + { + try + { + return message.getDoubleProperty(new SimpleString(name)); + } + catch (ActiveMQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public String getStringProperty(final String name) throws JMSException + { + if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) + { + return String.valueOf(message.getDeliveryCount()); + } + + try + { + if (MessageUtil.JMSXGROUPID.equals(name)) + { + return message.getStringProperty(org.apache.activemq.api.core.Message.HDR_GROUP_ID); + } + else + { + return message.getStringProperty(new SimpleString(name)); + } + } + catch (ActiveMQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public Object getObjectProperty(final String name) throws JMSException + { + if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) + { + return String.valueOf(message.getDeliveryCount()); + } + + Object val = message.getObjectProperty(name); + if (val instanceof SimpleString) + { + val = ((SimpleString)val).toString(); + } + return val; + } + + @SuppressWarnings("rawtypes") + @Override + public Enumeration getPropertyNames() throws JMSException + { + return Collections.enumeration(MessageUtil.getPropertyNames(message)); + } + + public void setBooleanProperty(final String name, final boolean value) throws JMSException + { + checkProperty(name); + + message.putBooleanProperty(new SimpleString(name), value); + } + + public void setByteProperty(final String name, final byte value) throws JMSException + { + checkProperty(name); + message.putByteProperty(new SimpleString(name), value); + } + + public void setShortProperty(final String name, final short value) throws JMSException + { + checkProperty(name); + message.putShortProperty(new SimpleString(name), value); + } + + public void setIntProperty(final String name, final int value) throws JMSException + { + checkProperty(name); + message.putIntProperty(new SimpleString(name), value); + } + + public void setLongProperty(final String name, final long value) throws JMSException + { + checkProperty(name); + message.putLongProperty(new SimpleString(name), value); + } + + public void setFloatProperty(final String name, final float value) throws JMSException + { + checkProperty(name); + message.putFloatProperty(new SimpleString(name), value); + } + + public void setDoubleProperty(final String name, final double value) throws JMSException + { + checkProperty(name); + message.putDoubleProperty(new SimpleString(name), value); + } + + public void setStringProperty(final String name, final String value) throws JMSException + { + checkProperty(name); + + if (MessageUtil.JMSXGROUPID.equals(name)) + { + message.putStringProperty(org.apache.activemq.api.core.Message.HDR_GROUP_ID, SimpleString.toSimpleString(value)); + } + else + { + message.putStringProperty(new SimpleString(name), SimpleString.toSimpleString(value)); + } + } + + public void setObjectProperty(final String name, final Object value) throws JMSException + { + if (ActiveMQJMSConstants.JMS_ACTIVEMQ_OUTPUT_STREAM.equals(name)) + { + setOutputStream((OutputStream)value); + + return; + } + else if (ActiveMQJMSConstants.JMS_ACTIVEMQ_SAVE_STREAM.equals(name)) + { + saveToOutputStream((OutputStream)value); + + return; + } + + checkProperty(name); + + if (ActiveMQJMSConstants.JMS_ACTIVEMQ_INPUT_STREAM.equals(name)) + { + setInputStream((InputStream)value); + + return; + } + + try + { + message.putObjectProperty(new SimpleString(name), value); + } + catch (ActiveMQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public void acknowledge() throws JMSException + { + if (session != null) + { + try + { + if (individualAck) + { + message.individualAcknowledge(); + } + + session.commit(); + } + catch (ActiveMQException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(e); + } + } + } + + @Override + public long getJMSDeliveryTime() throws JMSException + { + Long value; + try + { + value = message.getLongProperty(org.apache.activemq.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME); + } + catch (Exception e) + { + return 0; + } + + if (value == null) + { + return 0; + } + else + { + return value.longValue(); + } + } + + @Override + public void setJMSDeliveryTime(long deliveryTime) throws JMSException + { + message.putLongProperty(org.apache.activemq.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME, deliveryTime); + } + + @Override + public <T> T getBody(Class<T> c) throws JMSException + { + if (isBodyAssignableTo(c)) + { + return getBodyInternal(c); + } + // XXX HORNETQ-1209 Do we need translations here? + throw new MessageFormatException("Body not assignable to " + c); + } + + @SuppressWarnings("unchecked") + protected <T> T getBodyInternal(Class<T> c) throws MessageFormatException + { + InputStream is = ((MessageInternal)message).getBodyInputStream(); + try + { + ObjectInputStream ois = new ObjectInputStream(is); + return (T)ois.readObject(); + } + catch (Exception e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + + @Override + public boolean isBodyAssignableTo(@SuppressWarnings("rawtypes") Class c) + { + /** + * From the specs: + * <p> + * If the message is a {@code Message} (but not one of its subtypes) then this method will + * return true irrespective of the value of this parameter. + */ + return true; + } + + /** + * Helper method for {@link #isBodyAssignableTo(Class)}. + * @return true if the message has no body. + */ + protected boolean hasNoBody() + { + return message.getBodySize() == 0; + } + + // Public -------------------------------------------------------- + + public void setIndividualAcknowledge() + { + this.individualAck = true; + } + + public void resetMessageID(final String newMsgID) + { + this.msgID = newMsgID; + } + + public ClientMessage getCoreMessage() + { + return message; + } + + public void doBeforeSend() throws Exception + { + message.getBodyBuffer().resetReaderIndex(); + } + + public void checkBuffer() + { + message.getBodyBuffer(); + } + + public void doBeforeReceive() throws ActiveMQException + { + message.checkCompletion(); + + ActiveMQBuffer body = message.getBodyBuffer(); + + if (body != null) + { + body.resetReaderIndex(); + } + } + + public byte getType() + { + return ActiveMQMessage.TYPE; + } + + public void setInputStream(final InputStream input) throws JMSException + { + checkStream(); + if (readOnly) + { + throw ActiveMQJMSClientBundle.BUNDLE.messageNotWritable(); + } + + message.setBodyInputStream(input); + } + + public void setOutputStream(final OutputStream output) throws JMSException + { + checkStream(); + if (!readOnly) + { + throw new IllegalStateException("OutputStream property is only valid on received messages"); + } + + try + { + message.setOutputStream(output); + } + catch (ActiveMQException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(e); + } + } + + public void saveToOutputStream(final OutputStream output) throws JMSException + { + checkStream(); + if (!readOnly) + { + throw new IllegalStateException("OutputStream property is only valid on received messages"); + } + + try + { + message.saveToOutputStream(output); + } + catch (ActiveMQException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(e); + } + } + + public boolean waitCompletionOnStream(final long timeWait) throws JMSException + { + checkStream(); + try + { + return message.waitOutputStreamCompletion(timeWait); + } + catch (ActiveMQException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(e); + } + } + + @Override + public String toString() + { + StringBuffer sb = new StringBuffer("ActiveMQMessage["); + sb.append(getJMSMessageID()); + sb.append("]:"); + sb.append(message.isDurable() ? "PERSISTENT" : "NON-PERSISTENT"); + return sb.toString(); + } + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + protected void checkWrite() throws JMSException + { + if (readOnly) + { + throw ActiveMQJMSClientBundle.BUNDLE.messageNotWritable(); + } + } + + protected void checkRead() throws JMSException + { + if (!readOnly) + { + throw ActiveMQJMSClientBundle.BUNDLE.messageNotReadable(); + } + } + + // Private ------------------------------------------------------------ + + private void checkStream() throws JMSException + { + if (!(message.getType() == ActiveMQBytesMessage.TYPE || message.getType() == ActiveMQStreamMessage.TYPE)) + { + throw ActiveMQJMSClientBundle.BUNDLE.onlyValidForByteOrStreamMessages(); + } + } + + private void checkProperty(final String name) throws JMSException + { + if (propertiesReadOnly) + { + if (name.equals(ActiveMQJMSConstants.JMS_ACTIVEMQ_INPUT_STREAM)) + { + throw new MessageNotWriteableException("You cannot set the Input Stream on received messages. Did you mean " + ActiveMQJMSConstants.JMS_ACTIVEMQ_OUTPUT_STREAM + + " or " + + ActiveMQJMSConstants.JMS_ACTIVEMQ_SAVE_STREAM + + "?"); + } + else + { + throw ActiveMQJMSClientBundle.BUNDLE.messageNotWritable(); + } + } + + if (name == null) + { + throw ActiveMQJMSClientBundle.BUNDLE.nullArgumentNotAllowed("property"); + } + + if (name.equals("")) + { + throw new IllegalArgumentException("The name of a property must not be an empty String."); + } + + if (!isValidJavaIdentifier(name)) + { + throw ActiveMQJMSClientBundle.BUNDLE.invalidJavaIdentifier(name); + } + + if (ActiveMQMessage.reservedIdentifiers.contains(name)) + { + throw new JMSRuntimeException("The property name '" + name + "' is reserved due to selector syntax."); + } + + if (name.startsWith("JMS_ACTIVEMQ")) + { + throw new JMSRuntimeException("The property name '" + name + "' is illegal since it starts with JMS_ACTIVEMQ"); + } + } + + private boolean isValidJavaIdentifier(final String s) + { + if (s == null || s.length() == 0) + { + return false; + } + + char[] c = s.toCharArray(); + + if (!Character.isJavaIdentifierStart(c[0])) + { + return false; + } + + for (int i = 1; i < c.length; i++) + { + if (!Character.isJavaIdentifierPart(c[i])) + { + return false; + } + } + + return true; + } + + private void checkPriority(final int priority) throws JMSException + { + if (priority < 0 || priority > 9) + { + throw new JMSException(priority + " is not valid: priority must be between 0 and 9"); + } + } + + // Inner classes ------------------------------------------------- +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageConsumer.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageConsumer.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageConsumer.java new file mode 100644 index 0000000..e721b3c --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageConsumer.java @@ -0,0 +1,253 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.QueueReceiver; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicSubscriber; + +import org.apache.activemq.api.core.ActiveMQException; +import org.apache.activemq.api.core.SimpleString; +import org.apache.activemq.api.core.client.ClientConsumer; +import org.apache.activemq.api.core.client.ClientMessage; +import org.apache.activemq.api.core.client.MessageHandler; +import org.apache.activemq.api.jms.ActiveMQJMSConstants; + +/** + * ActiveMQ implementation of a JMS MessageConsumer. + * + * @author <a href="mailto:ovi...@feodorov.com">Ovidiu Feodorov</a> + */ +public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscriber +{ + private final ClientConsumer consumer; + + private MessageListener listener; + + private MessageHandler coreListener; + + private final ActiveMQConnection connection; + + private final ActiveMQSession session; + + private final int ackMode; + + private final boolean noLocal; + + private final ActiveMQDestination destination; + + private final String selector; + + private final SimpleString autoDeleteQueueName; + + // Constructors -------------------------------------------------- + + protected ActiveMQMessageConsumer(final ActiveMQConnection connection, + final ActiveMQSession session, + final ClientConsumer consumer, + final boolean noLocal, + final ActiveMQDestination destination, + final String selector, + final SimpleString autoDeleteQueueName) throws JMSException + { + this.connection = connection; + + this.session = session; + + this.consumer = consumer; + + ackMode = session.getAcknowledgeMode(); + + this.noLocal = noLocal; + + this.destination = destination; + + this.selector = selector; + + this.autoDeleteQueueName = autoDeleteQueueName; + } + + // MessageConsumer implementation -------------------------------- + + public String getMessageSelector() throws JMSException + { + checkClosed(); + + return selector; + } + + public MessageListener getMessageListener() throws JMSException + { + checkClosed(); + + return listener; + } + + public void setMessageListener(final MessageListener listener) throws JMSException + { + this.listener = listener; + + coreListener = listener == null ? null : new JMSMessageListenerWrapper(connection, session, consumer, listener, ackMode); + + try + { + consumer.setMessageHandler(coreListener); + } + catch (ActiveMQException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(e); + } + } + + public Message receive() throws JMSException + { + return getMessage(0, false); + } + + public Message receive(final long timeout) throws JMSException + { + return getMessage(timeout, false); + } + + public Message receiveNoWait() throws JMSException + { + return getMessage(0, true); + } + + public void close() throws JMSException + { + try + { + consumer.close(); + + if (autoDeleteQueueName != null) + { + // If non durable subscriber need to delete subscription too + session.deleteQueue(autoDeleteQueueName); + } + + session.removeConsumer(this); + } + catch (ActiveMQException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(e); + } + } + + // QueueReceiver implementation ---------------------------------- + + public Queue getQueue() throws JMSException + { + checkClosed(); + + return (Queue)destination; + } + + // TopicSubscriber implementation -------------------------------- + + public Topic getTopic() throws JMSException + { + checkClosed(); + + return (Topic)destination; + } + + public boolean getNoLocal() throws JMSException + { + checkClosed(); + + return noLocal; + } + + // Public -------------------------------------------------------- + + @Override + public String toString() + { + return "ActiveMQMessageConsumer[" + consumer + "]"; + } + + public boolean isClosed() + { + return consumer.isClosed(); + } + + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + private void checkClosed() throws JMSException + { + if (consumer.isClosed() || session.getCoreSession().isClosed()) + { + throw new IllegalStateException("Consumer is closed"); + } + } + + private ActiveMQMessage getMessage(final long timeout, final boolean noWait) throws JMSException + { + try + { + ClientMessage coreMessage; + + if (noWait) + { + coreMessage = consumer.receiveImmediate(); + } + else + { + coreMessage = consumer.receive(timeout); + } + + ActiveMQMessage jmsMsg = null; + + if (coreMessage != null) + { + boolean needSession = + ackMode == Session.CLIENT_ACKNOWLEDGE || ackMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE; + jmsMsg = ActiveMQMessage.createMessage(coreMessage, needSession ? session.getCoreSession() : null); + + jmsMsg.doBeforeReceive(); + + // We Do the ack after doBeforeRecive, as in the case of large messages, this may fail so we don't want messages redelivered + // https://issues.jboss.org/browse/JBPAPP-6110 + if (session.getAcknowledgeMode() == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE) + { + jmsMsg.setIndividualAcknowledge(); + } + else + { + coreMessage.acknowledge(); + } + } + + return jmsMsg; + } + catch (ActiveMQException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(e); + } + } + + // Inner classes ------------------------------------------------- + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageProducer.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageProducer.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageProducer.java new file mode 100644 index 0000000..00aed53 --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageProducer.java @@ -0,0 +1,601 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + +import javax.jms.BytesMessage; +import javax.jms.CompletionListener; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.InvalidDestinationException; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.QueueSender; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicPublisher; + +import org.apache.activemq.api.core.ActiveMQException; +import org.apache.activemq.api.core.SimpleString; +import org.apache.activemq.api.core.client.ClientMessage; +import org.apache.activemq.api.core.client.ClientProducer; +import org.apache.activemq.api.core.client.ClientSession; +import org.apache.activemq.api.core.client.SendAcknowledgementHandler; +import org.apache.activemq.utils.UUID; +import org.apache.activemq.utils.UUIDGenerator; +/** + * ActiveMQ implementation of a JMS MessageProducer. + * + * @author <a href="mailto:ovi...@feodorov.com">Ovidiu Feodorov</a> + * @author <a href="mailto:tim....@jboss.com">Tim Fox</a> + * @author <a href="mailto:atay...@redhat.com">Andy Taylor</a> + */ +public class ActiveMQMessageProducer implements MessageProducer, QueueSender, TopicPublisher +{ + private final ActiveMQConnection connection; + + private final SimpleString connID; + + private final ClientProducer clientProducer; + private final ClientSession clientSession; + + private boolean disableMessageID = false; + + private boolean disableMessageTimestamp = false; + + private int defaultPriority = Message.DEFAULT_PRIORITY; + private long defaultTimeToLive = Message.DEFAULT_TIME_TO_LIVE; + private int defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE; + private long defaultDeliveryDelay = Message.DEFAULT_DELIVERY_DELAY; + + private final ActiveMQDestination defaultDestination; + // Constructors -------------------------------------------------- + + protected ActiveMQMessageProducer(final ActiveMQConnection connection, final ClientProducer producer, + final ActiveMQDestination defaultDestination, final ClientSession clientSession) throws JMSException + { + this.connection = connection; + + connID = connection.getClientID() != null ? new SimpleString(connection.getClientID()) : connection.getUID(); + + this.clientProducer = producer; + + this.defaultDestination = defaultDestination; + + this.clientSession = clientSession; + } + + // MessageProducer implementation -------------------------------- + + public void setDisableMessageID(final boolean value) throws JMSException + { + checkClosed(); + + disableMessageID = value; + } + + public boolean getDisableMessageID() throws JMSException + { + checkClosed(); + + return disableMessageID; + } + + public void setDisableMessageTimestamp(final boolean value) throws JMSException + { + checkClosed(); + + disableMessageTimestamp = value; + } + + public boolean getDisableMessageTimestamp() throws JMSException + { + checkClosed(); + + return disableMessageTimestamp; + } + + public void setDeliveryMode(final int deliveryMode) throws JMSException + { + checkClosed(); + if (deliveryMode != DeliveryMode.NON_PERSISTENT && deliveryMode != DeliveryMode.PERSISTENT) + { + throw ActiveMQJMSClientBundle.BUNDLE.illegalDeliveryMode(deliveryMode); + } + + defaultDeliveryMode = deliveryMode; + } + + public int getDeliveryMode() throws JMSException + { + checkClosed(); + + return defaultDeliveryMode; + } + + public void setPriority(final int defaultPriority) throws JMSException + { + checkClosed(); + + if (defaultPriority < 0 || defaultPriority > 9) + { + throw new JMSException("Illegal priority value: " + defaultPriority); + } + + this.defaultPriority = defaultPriority; + } + + public int getPriority() throws JMSException + { + checkClosed(); + + return defaultPriority; + } + + public void setTimeToLive(final long timeToLive) throws JMSException + { + checkClosed(); + + defaultTimeToLive = timeToLive; + } + + public long getTimeToLive() throws JMSException + { + checkClosed(); + + return defaultTimeToLive; + } + + public Destination getDestination() throws JMSException + { + checkClosed(); + + return defaultDestination; + } + + public void close() throws JMSException + { + connection.getThreadAwareContext().assertNotCompletionListenerThread(); + try + { + clientProducer.close(); + } + catch (ActiveMQException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(e); + } + } + + public void send(final Message message) throws JMSException + { + checkDefaultDestination(); + doSendx(defaultDestination, message, defaultDeliveryMode, defaultPriority, defaultTimeToLive, null); + } + + public void send(final Message message, + final int deliveryMode, + final int priority, final long timeToLive) throws JMSException + { + checkDefaultDestination(); + doSendx(defaultDestination, message, deliveryMode, priority, timeToLive, null); + } + + public void send(final Destination destination, final Message message) throws JMSException + { + send(destination, message, defaultDeliveryMode, defaultPriority, defaultTimeToLive); + } + + public void send(final Destination destination, final Message message, final int deliveryMode, final int priority, + final long timeToLive) throws JMSException + { + checkClosed(); + + checkDestination(destination); + + doSendx((ActiveMQDestination)destination, message, deliveryMode, priority, timeToLive, null); + } + + @Override + public void setDeliveryDelay(long deliveryDelay) throws JMSException + { + this.defaultDeliveryDelay = deliveryDelay; + } + + @Override + public long getDeliveryDelay() throws JMSException + { + return defaultDeliveryDelay; + } + + @Override + public void send(Message message, CompletionListener completionListener) throws JMSException + { + send(message, defaultDeliveryMode, defaultPriority, defaultTimeToLive, completionListener); + } + + @Override + public void send(Message message, int deliveryMode, int priority, long timeToLive, + CompletionListener completionListener) throws JMSException + { + checkCompletionListener(completionListener); + checkDefaultDestination(); + doSendx(defaultDestination, message, deliveryMode, priority, timeToLive, completionListener); + } + + @Override + public void send(Destination destination, Message message, CompletionListener completionListener) throws JMSException + { + send(destination, message, defaultDeliveryMode, defaultPriority, defaultTimeToLive, completionListener); + } + + @Override + public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, + CompletionListener completionListener) throws JMSException + { + checkClosed(); + + checkCompletionListener(completionListener); + + checkDestination(destination); + + doSendx((ActiveMQDestination)destination, message, deliveryMode, priority, timeToLive, completionListener); + } + + // TopicPublisher Implementation --------------------------------- + + public Topic getTopic() throws JMSException + { + return (Topic)getDestination(); + } + + public void publish(final Message message) throws JMSException + { + send(message); + } + + public void publish(final Topic topic, final Message message) throws JMSException + { + send(topic, message); + } + + public void publish(final Message message, final int deliveryMode, final int priority, final long timeToLive) throws JMSException + { + send(message, deliveryMode, priority, timeToLive); + } + + public void publish(final Topic topic, final Message message, final int deliveryMode, final int priority, + final long timeToLive) throws JMSException + { + checkDestination(topic); + doSendx((ActiveMQDestination)topic, message, deliveryMode, priority, timeToLive, null); + } + + // QueueSender Implementation ------------------------------------ + + public void send(final Queue queue, final Message message) throws JMSException + { + send((Destination)queue, message); + } + + public void send(final Queue queue, final Message message, final int deliveryMode, final int priority, + final long timeToLive) throws JMSException + { + checkDestination(queue); + doSendx((ActiveMQDestination)queue, message, deliveryMode, priority, timeToLive, null); + } + + public Queue getQueue() throws JMSException + { + return (Queue)getDestination(); + } + + // Public -------------------------------------------------------- + + @Override + public String toString() + { + return "ActiveMQMessageProducer->" + clientProducer; + } + + /** + * Check if the default destination has been set + */ + private void checkDefaultDestination() + { + if (defaultDestination == null) + { + throw new UnsupportedOperationException("Cannot specify destination if producer has a default destination"); + } + } + + /** + * Check if the destination is sent correctly + */ + private void checkDestination(Destination destination) throws InvalidDestinationException + { + if (destination != null && !(destination instanceof ActiveMQDestination)) + { + throw new InvalidDestinationException("Not a ActiveMQ Destination:" + destination); + } + if (destination != null && defaultDestination != null) + { + throw new UnsupportedOperationException("Cannot specify destination if producer has a default destination"); + } + if (destination == null) + { + throw ActiveMQJMSClientBundle.BUNDLE.nullTopic(); + } + } + + private void checkCompletionListener(CompletionListener completionListener) + { + if (completionListener == null) + { + throw ActiveMQJMSClientBundle.BUNDLE.nullArgumentNotAllowed("CompletionListener"); + } + } + + + private void doSendx(ActiveMQDestination destination, final Message jmsMessage, final int deliveryMode, + final int priority, final long timeToLive, + CompletionListener completionListener) throws JMSException + { + + jmsMessage.setJMSDeliveryMode(deliveryMode); + + jmsMessage.setJMSPriority(priority); + + + if (timeToLive == 0) + { + jmsMessage.setJMSExpiration(0); + } + else + { + jmsMessage.setJMSExpiration(System.currentTimeMillis() + timeToLive); + } + + if (!disableMessageTimestamp) + { + jmsMessage.setJMSTimestamp(System.currentTimeMillis()); + } + else + { + jmsMessage.setJMSTimestamp(0); + } + + SimpleString address = null; + + if (destination == null) + { + if (defaultDestination == null) + { + throw new UnsupportedOperationException("Destination must be specified on send with an anonymous producer"); + } + + destination = defaultDestination; + } + else + { + if (defaultDestination != null) + { + if (!destination.equals(defaultDestination)) + { + throw new UnsupportedOperationException("Where a default destination is specified " + "for the sender and a destination is " + + "specified in the arguments to the send, " + + "these destinations must be equal"); + } + } + + address = destination.getSimpleAddress(); + + if (!connection.containsKnownDestination(address)) + { + try + { + ClientSession.AddressQuery query = clientSession.addressQuery(address); + if (!query.isExists()) + { + throw new InvalidDestinationException("Destination " + address + " does not exist"); + } + else + { + connection.addKnownDestination(address); + } + } + catch (ActiveMQException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(e); + } + } + } + + ActiveMQMessage activeMQJmsMessage; + + boolean foreign = false; + + // First convert from foreign message if appropriate + if (!(jmsMessage instanceof ActiveMQMessage)) + { + // JMS 1.1 Sect. 3.11.4: A provider must be prepared to accept, from a client, + // a message whose implementation is not one of its own. + + if (jmsMessage instanceof BytesMessage) + { + activeMQJmsMessage = new ActiveMQBytesMessage((BytesMessage)jmsMessage, clientSession); + } + else if (jmsMessage instanceof MapMessage) + { + activeMQJmsMessage = new ActiveMQMapMessage((MapMessage)jmsMessage, clientSession); + } + else if (jmsMessage instanceof ObjectMessage) + { + activeMQJmsMessage = new ActiveMQObjectMessage((ObjectMessage)jmsMessage, clientSession); + } + else if (jmsMessage instanceof StreamMessage) + { + activeMQJmsMessage = new ActiveMQStreamMessage((StreamMessage)jmsMessage, clientSession); + } + else if (jmsMessage instanceof TextMessage) + { + activeMQJmsMessage = new ActiveMQTextMessage((TextMessage)jmsMessage, clientSession); + } + else + { + activeMQJmsMessage = new ActiveMQMessage(jmsMessage, clientSession); + } + + // Set the destination on the original message + jmsMessage.setJMSDestination(destination); + + foreign = true; + } + else + { + activeMQJmsMessage = (ActiveMQMessage)jmsMessage; + } + + if (!disableMessageID) + { + // Generate a JMS id + + UUID uid = UUIDGenerator.getInstance().generateUUID(); + + activeMQJmsMessage.getCoreMessage().setUserID(uid); + + activeMQJmsMessage.resetMessageID(null); + } + + if (foreign) + { + jmsMessage.setJMSMessageID(activeMQJmsMessage.getJMSMessageID()); + } + + activeMQJmsMessage.setJMSDestination(destination); + + try + { + activeMQJmsMessage.doBeforeSend(); + } + catch (Exception e) + { + JMSException je = new JMSException(e.getMessage()); + + je.initCause(e); + + throw je; + } + + if (defaultDeliveryDelay > 0) + { + activeMQJmsMessage.setJMSDeliveryTime(System.currentTimeMillis() + defaultDeliveryDelay); + } + + ClientMessage coreMessage = activeMQJmsMessage.getCoreMessage(); + coreMessage.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME, connID); + + try + { + /** + * Using a completionListener requires wrapping using a {@link CompletionListenerWrapper}, + * so we avoid it if we can. + */ + if (completionListener != null) + { + clientProducer.send(address, coreMessage, new CompletionListenerWrapper(completionListener, jmsMessage, this)); + } + else + { + clientProducer.send(address, coreMessage); + } + } + catch (ActiveMQException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(e); + } + } + + private void checkClosed() throws JMSException + { + if (clientProducer.isClosed() || clientSession.isClosed()) + { + throw new IllegalStateException("Producer is closed"); + } + } + + private static final class CompletionListenerWrapper implements SendAcknowledgementHandler + { + private final CompletionListener completionListener; + private final Message jmsMessage; + private final ActiveMQMessageProducer producer; + + /** + * @param jmsMessage + * @param producer + */ + public CompletionListenerWrapper(CompletionListener listener, Message jmsMessage, ActiveMQMessageProducer producer) + { + this.completionListener = listener; + this.jmsMessage = jmsMessage; + this.producer = producer; + } + + @Override + public void sendAcknowledged(org.apache.activemq.api.core.Message clientMessage) + { + if (jmsMessage instanceof StreamMessage) + { + try + { + ((StreamMessage)jmsMessage).reset(); + } + catch (JMSException e) + { + // HORNETQ-1209 XXX ignore? + } + } + if (jmsMessage instanceof BytesMessage) + { + try + { + ((BytesMessage)jmsMessage).reset(); + } + catch (JMSException e) + { + // HORNETQ-1209 XXX ignore? + } + } + + try + { + producer.connection.getThreadAwareContext().setCurrentThread(true); + completionListener.onCompletion(jmsMessage); + } + finally + { + producer.connection.getThreadAwareContext().clearCurrentThread(true); + } + } + + @Override + public String toString() + { + return CompletionListenerWrapper.class.getSimpleName() + "( completionListener=" + completionListener + ")"; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQObjectMessage.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQObjectMessage.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQObjectMessage.java new file mode 100644 index 0000000..8e7a1aa --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQObjectMessage.java @@ -0,0 +1,202 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + +import javax.jms.JMSException; +import javax.jms.MessageFormatException; +import javax.jms.ObjectMessage; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +import org.apache.activemq.api.core.ActiveMQException; +import org.apache.activemq.api.core.Message; +import org.apache.activemq.api.core.client.ClientMessage; +import org.apache.activemq.api.core.client.ClientSession; + +/** + * ActiveMQ implementation of a JMS ObjectMessage. + * <br> + * Don't used ObjectMessage if you want good performance! + * <p> + * Serialization is slooooow! + * + * @author <a href="mailto:tim....@jboss.com">Tim Fox</a> + * @author <a href="mailto:ovi...@feodorov.com">Ovidiu Feodorov</a> + * @author <a href="mailto:atay...@redhat.com">Andy Taylor</a> + */ +public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMessage +{ + // Constants ----------------------------------------------------- + + public static final byte TYPE = Message.OBJECT_TYPE; + + // Attributes ---------------------------------------------------- + + // keep a snapshot of the Serializable Object as a byte[] to provide Object isolation + private byte[] data; + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + protected ActiveMQObjectMessage(final ClientSession session) + { + super(ActiveMQObjectMessage.TYPE, session); + } + + protected ActiveMQObjectMessage(final ClientMessage message, final ClientSession session) + { + super(message, session); + } + + /** + * A copy constructor for foreign JMS ObjectMessages. + */ + public ActiveMQObjectMessage(final ObjectMessage foreign, final ClientSession session) throws JMSException + { + super(foreign, ActiveMQObjectMessage.TYPE, session); + + setObject(foreign.getObject()); + } + + // Public -------------------------------------------------------- + + @Override + public byte getType() + { + return ActiveMQObjectMessage.TYPE; + } + + @Override + public void doBeforeSend() throws Exception + { + message.getBodyBuffer().clear(); + if (data != null) + { + message.getBodyBuffer().writeInt(data.length); + message.getBodyBuffer().writeBytes(data); + } + + super.doBeforeSend(); + } + + @Override + public void doBeforeReceive() throws ActiveMQException + { + super.doBeforeReceive(); + try + { + int len = message.getBodyBuffer().readInt(); + data = new byte[len]; + message.getBodyBuffer().readBytes(data); + } + catch (Exception e) + { + data = null; + } + + } + + // ObjectMessage implementation ---------------------------------- + + public void setObject(final Serializable object) throws JMSException + { + checkWrite(); + + if (object != null) + { + try + { + ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); + + ObjectOutputStream oos = new ObjectOutputStream(baos); + + oos.writeObject(object); + + oos.flush(); + + data = baos.toByteArray(); + } + catch (Exception e) + { + JMSException je = new JMSException("Failed to serialize object"); + je.setLinkedException(e); + je.initCause(e); + throw je; + } + } + } + + // lazy deserialize the Object the first time the client requests it + public Serializable getObject() throws JMSException + { + if (data == null || data.length == 0) + { + return null; + } + + try + { + ByteArrayInputStream bais = new ByteArrayInputStream(data); + ObjectInputStream ois = new org.apache.activemq.utils.ObjectInputStreamWithClassLoader(bais); + Serializable object = (Serializable)ois.readObject(); + return object; + } + catch (Exception e) + { + JMSException je = new JMSException(e.getMessage()); + je.setStackTrace(e.getStackTrace()); + throw je; + } + } + + @Override + public void clearBody() throws JMSException + { + super.clearBody(); + + data = null; + } + + @Override + protected <T> T getBodyInternal(Class<T> c) throws MessageFormatException + { + try + { + return (T)getObject(); + } + catch (JMSException e) + { + throw new MessageFormatException("Deserialization error on ActiveMQObjectMessage"); + } + } + + @Override + public boolean isBodyAssignableTo(@SuppressWarnings("rawtypes") + Class c) + { + if (data == null) // we have no body + return true; + try + { + return Serializable.class == c || Object.class == c || c.isInstance(getObject()); + } + catch (JMSException e) + { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQQueue.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQQueue.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQQueue.java new file mode 100644 index 0000000..6bb32bd --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQQueue.java @@ -0,0 +1,90 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + +import javax.jms.Queue; + +import org.apache.activemq.api.core.SimpleString; + +/** + * ActiveMQ implementation of a JMS Queue. + * <br> + * This class can be instantiated directly. + * + * @author <a href="mailto:ovi...@feodorov.com">Ovidiu Feodorov</a> + * @author <a href="mailto:tim....@jboss.com">Tim Fox</a> + * @version <tt>$Revision: 8737 $</tt> + * + */ +public class ActiveMQQueue extends ActiveMQDestination implements Queue +{ + // Constants ----------------------------------------------------- + private static final long serialVersionUID = -1106092883162295462L; + + // Static -------------------------------------------------------- + + public static SimpleString createAddressFromName(final String name) + { + return new SimpleString(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX + name); + } + + // Attributes ---------------------------------------------------- + + // Constructors -------------------------------------------------- + + public ActiveMQQueue(final String name) + { + super(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX + name, name, false, true, null); + } + + + + /** + * @param address + * @param name + * @param temporary + * @param session + */ + public ActiveMQQueue(String address, String name, boolean temporary, ActiveMQSession session) + { + super(address, name, temporary, true, session); + } + + public ActiveMQQueue(final String address, final String name) + { + super(address, name, false, true, null); + } + + // Queue implementation ------------------------------------------ + + // Public -------------------------------------------------------- + + public String getQueueName() + { + return name; + } + + @Override + public String toString() + { + return "ActiveMQQueue[" + name + "]"; + } + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQQueueBrowser.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQQueueBrowser.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQQueueBrowser.java new file mode 100644 index 0000000..eacff5f --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQQueueBrowser.java @@ -0,0 +1,169 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + +import java.util.Enumeration; +import java.util.NoSuchElementException; + +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.QueueBrowser; + +import org.apache.activemq.api.core.ActiveMQException; +import org.apache.activemq.api.core.SimpleString; +import org.apache.activemq.api.core.client.ClientConsumer; +import org.apache.activemq.api.core.client.ClientMessage; +import org.apache.activemq.api.core.client.ClientSession; + +/** + * ActiveMQ implementation of a JMS QueueBrowser. + * + * @author <a href="mailto:tim....@jboss.com">Tim Fox</a> + * @author <a href="mailto:andy.tay...@jboss.org">Andy Taylor</a> + * + */ +public final class ActiveMQQueueBrowser implements QueueBrowser +{ + // Constants ------------------------------------------------------------------------------------ + + // Static --------------------------------------------------------------------------------------- + + // Attributes ----------------------------------------------------------------------------------- + + private final ClientSession session; + + private ClientConsumer consumer; + + private final ActiveMQQueue queue; + + private SimpleString filterString; + + // Constructors --------------------------------------------------------------------------------- + + protected ActiveMQQueueBrowser(final ActiveMQQueue queue, final String messageSelector, final ClientSession session) throws JMSException + { + this.session = session; + this.queue = queue; + if (messageSelector != null) + { + filterString = new SimpleString(SelectorTranslator.convertToActiveMQFilterString(messageSelector)); + } + } + + // QueueBrowser implementation ------------------------------------------------------------------- + + public void close() throws JMSException + { + if (consumer != null) + { + try + { + consumer.close(); + } + catch (ActiveMQException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(e); + } + } + } + + public Enumeration getEnumeration() throws JMSException + { + try + { + close(); + + consumer = session.createConsumer(queue.getSimpleAddress(), filterString, true); + + return new BrowserEnumeration(); + } + catch (ActiveMQException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(e); + } + + } + + public String getMessageSelector() throws JMSException + { + return filterString == null ? null : filterString.toString(); + } + + public Queue getQueue() throws JMSException + { + return queue; + } + + // Public --------------------------------------------------------------------------------------- + + @Override + public String toString() + { + return "ActiveMQQueueBrowser->" + consumer; + } + + // Package protected ---------------------------------------------------------------------------- + + // Protected ------------------------------------------------------------------------------------ + + // Private -------------------------------------------------------------------------------------- + + // Inner classes -------------------------------------------------------------------------------- + + private final class BrowserEnumeration implements Enumeration<ActiveMQMessage> + { + ClientMessage current = null; + + public boolean hasMoreElements() + { + if (current == null) + { + try + { + current = consumer.receiveImmediate(); + } + catch (ActiveMQException e) + { + return false; + } + } + return current != null; + } + + public ActiveMQMessage nextElement() + { + ActiveMQMessage msg; + if (hasMoreElements()) + { + ClientMessage next = current; + current = null; + msg = ActiveMQMessage.createMessage(next, session); + try + { + msg.doBeforeReceive(); + } + catch (Exception e) + { + ActiveMQJMSClientLogger.LOGGER.errorCreatingMessage(e); + + return null; + } + return msg; + } + else + { + throw new NoSuchElementException(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQQueueConnectionFactory.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQQueueConnectionFactory.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQQueueConnectionFactory.java new file mode 100644 index 0000000..84f5fa9 --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQQueueConnectionFactory.java @@ -0,0 +1,69 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + +import javax.jms.QueueConnectionFactory; + +import org.apache.activemq.api.core.DiscoveryGroupConfiguration; +import org.apache.activemq.api.core.TransportConfiguration; +import org.apache.activemq.api.core.client.ServerLocator; +import org.apache.activemq.api.jms.JMSFactoryType; + +/** + * A class that represents a QueueConnectionFactory. + * + * @author <a href="mailto:h...@redhat.com">Howard Gao</a> + */ +public class ActiveMQQueueConnectionFactory extends ActiveMQConnectionFactory implements QueueConnectionFactory +{ + private static final long serialVersionUID = 5312455021322463546L; + + /** + * + */ + public ActiveMQQueueConnectionFactory() + { + super(); + } + + /** + * @param serverLocator + */ + public ActiveMQQueueConnectionFactory(ServerLocator serverLocator) + { + super(serverLocator); + } + + /** + * @param ha + * @param groupConfiguration + */ + public ActiveMQQueueConnectionFactory(boolean ha, final DiscoveryGroupConfiguration groupConfiguration) + { + super(ha, groupConfiguration); + } + + /** + * @param ha + * @param initialConnectors + */ + public ActiveMQQueueConnectionFactory(boolean ha, TransportConfiguration... initialConnectors) + { + super(ha, initialConnectors); + } + + public int getFactoryType() + { + return JMSFactoryType.QUEUE_CF.intValue(); + } +}