http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java new file mode 100644 index 0000000..813915d --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -0,0 +1,872 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.broker; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.RefCountMessage; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.persistence.Persister; +import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter; +import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport; +import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; +import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode; +import org.apache.activemq.artemis.utils.DataConstants; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.UnsignedInteger; +import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; +import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; +import org.apache.qpid.proton.amqp.messaging.Header; +import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.amqp.messaging.Properties; +import org.apache.qpid.proton.amqp.messaging.Section; +import org.apache.qpid.proton.codec.DecoderImpl; +import org.apache.qpid.proton.codec.WritableBuffer; +import org.apache.qpid.proton.message.Message; +import org.apache.qpid.proton.message.impl.MessageImpl; + +// see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format +public class AMQPMessage extends RefCountMessage { + + final long messageFormat; + ByteBuf data; + boolean bufferValid; + byte type; + long messageID; + String address; + MessageImpl protonMessage; + private volatile int memoryEstimate = -1; + private long expiration = 0; + // this is to store where to start sending bytes, ignoring header and delivery annotations. + private int sendFrom = -1; + private boolean parsedHeaders = false; + private Header _header; + private DeliveryAnnotations _deliveryAnnotations; + private MessageAnnotations _messageAnnotations; + private Properties _properties; + private ApplicationProperties applicationProperties; + private long scheduledTime = -1; + + public AMQPMessage(long messageFormat, byte[] data) { + this.data = Unpooled.wrappedBuffer(data); + this.messageFormat = messageFormat; + this.bufferValid = true; + + } + + /** for persistence reload */ + public AMQPMessage(long messageFormat) { + this.messageFormat = messageFormat; + this.bufferValid = false; + + } + + public AMQPMessage(long messageFormat, Message message) { + this.messageFormat = messageFormat; + this.protonMessage = (MessageImpl)message; + + } + + public AMQPMessage(Message message) { + this(0, message); + } + + public MessageImpl getProtonMessage() { + if (protonMessage == null) { + protonMessage = (MessageImpl) Message.Factory.create(); + + if (data != null) { + data.readerIndex(0); + protonMessage.decode(data.nioBuffer()); + this._header = protonMessage.getHeader(); + protonMessage.setHeader(null); + } + } + + return protonMessage; + } + + private void initalizeObjects() { + if (protonMessage == null) { + if (data == null) { + this.sendFrom = -1; + _header = new Header(); + _deliveryAnnotations = new DeliveryAnnotations(new HashMap<>()); + _properties = new Properties(); + this.applicationProperties = new ApplicationProperties(new HashMap<>()); + this.protonMessage = (MessageImpl)Message.Factory.create(); + this.protonMessage.setApplicationProperties(applicationProperties); + this.protonMessage.setDeliveryAnnotations(_deliveryAnnotations); + } + } + } + + private Map getApplicationPropertiesMap() { + + ApplicationProperties appMap = getApplicationProperties(); + Map map = null; + + if (appMap != null) { + map = appMap.getValue(); + } + + if (map == null) { + return Collections.emptyMap(); + } else { + return map; + } + } + + private ApplicationProperties getApplicationProperties() { + parseHeaders(); + return applicationProperties; + } + + private void parseHeaders() { + if (!parsedHeaders) { + if (data == null) { + initalizeObjects(); + } else { + partialDecode(data.nioBuffer()); + } + parsedHeaders = true; + } + } + + public MessageAnnotations getMessageAnnotations() { + parseHeaders(); + return _messageAnnotations; + } + + public Header getHeader() { + parseHeaders(); + return _header; + } + + public Properties getProperties() { + parseHeaders(); + return _properties; + } + + private Object getSymbol(String symbol) { + return getSymbol(Symbol.getSymbol(symbol)); + } + + private Object getSymbol(Symbol symbol) { + MessageAnnotations annotations = getMessageAnnotations(); + Map mapAnnotations = annotations != null ? annotations.getValue() : null; + if (mapAnnotations != null) { + return mapAnnotations.get(symbol); + } + + return null; + } + + + private void setSymbol(String symbol, Object value) { + setSymbol(Symbol.getSymbol(symbol), value); + } + + private void setSymbol(Symbol symbol, Object value) { + MessageAnnotations annotations = getMessageAnnotations(); + Map mapAnnotations = annotations != null ? annotations.getValue() : null; + if (mapAnnotations != null) { + mapAnnotations.put(symbol, value); + } + } + + @Override + public RoutingType getRouteType() { + + /* TODO-now How to use this properly + switch (((Byte)type).byteValue()) { + case AMQPMessageSupport.QUEUE_TYPE: + case AMQPMessageSupport.TEMP_QUEUE_TYPE: + return RoutingType.ANYCAST; + + case AMQPMessageSupport.TOPIC_TYPE: + case AMQPMessageSupport.TEMP_TOPIC_TYPE: + return RoutingType.MULTICAST; + default: + return null; + } */ + + + return null; + } + + + + @Override + public Long getScheduledDeliveryTime() { + + if (scheduledTime < 0) { + Object objscheduledTime = getSymbol("x-opt-delivery-time"); + Object objdelay = getSymbol("x-opt-delivery-delay"); + + if (objscheduledTime != null && objscheduledTime instanceof Number) { + this.scheduledTime = ((Number) objscheduledTime).longValue(); + } else if (objdelay != null && objdelay instanceof Number) { + this.scheduledTime = System.currentTimeMillis() + ((Number) objdelay).longValue(); + } else { + this.scheduledTime = 0; + } + } + + return scheduledTime == 0 ? null : scheduledTime; + } + + @Override + public AMQPMessage setScheduledDeliveryTime(Long time) { + parseHeaders(); + setSymbol(AMQPMessageSupport.JMS_DELIVERY_TIME, time); + return this; + } + + @Override + public Persister<org.apache.activemq.artemis.api.core.Message> getPersister() { + return AMQPMessagePersister.getInstance(); + } + + private synchronized void partialDecode(ByteBuffer buffer) { + DecoderImpl decoder = TLSEncode.getDecoder(); + decoder.setByteBuffer(buffer); + buffer.position(0); + + _header = null; + _deliveryAnnotations = null; + _messageAnnotations = null; + _properties = null; + applicationProperties = null; + Section section = null; + + try { + if (buffer.hasRemaining()) { + section = (Section) decoder.readObject(); + } + + if (section instanceof Header) { + sendFrom = buffer.position(); + _header = (Header) section; + + if (_header.getTtl() != null) { + this.expiration = System.currentTimeMillis() + _header.getTtl().intValue(); + } + + if (buffer.hasRemaining()) { + section = (Section) decoder.readObject(); + } else { + section = null; + } + } else { + // meaning there is no header + sendFrom = 0; + } + if (section instanceof DeliveryAnnotations) { + _deliveryAnnotations = (DeliveryAnnotations) section; + sendFrom = buffer.position(); + + if (buffer.hasRemaining()) { + section = (Section) decoder.readObject(); + } else { + section = null; + } + + } + if (section instanceof MessageAnnotations) { + _messageAnnotations = (MessageAnnotations) section; + + if (buffer.hasRemaining()) { + section = (Section) decoder.readObject(); + } else { + section = null; + } + + } + if (section instanceof Properties) { + _properties = (Properties) section; + + if (buffer.hasRemaining()) { + section = (Section) decoder.readObject(); + } else { + section = null; + } + } + + if (section instanceof ApplicationProperties) { + applicationProperties = (ApplicationProperties) section; + } + } finally { + decoder.setByteBuffer(null); + } + } + + public long getMessageFormat() { + return messageFormat; + } + + public int getLength() { + return data.array().length; + } + + public byte[] getArray() { + return data.array(); + } + + @Override + public void messageChanged() { + bufferValid = false; + this.data = null; + } + + @Override + public ByteBuf getBuffer() { + if (data == null) { + return null; + } else { + return Unpooled.wrappedBuffer(data); + } + } + + @Override + public AMQPMessage setBuffer(ByteBuf buffer) { + this.data = null; + return this; + } + + @Override + public org.apache.activemq.artemis.api.core.Message copy() { + checkBuffer(); + AMQPMessage newEncode = new AMQPMessage(this.messageFormat, data.array()); + return newEncode; + } + + @Override + public org.apache.activemq.artemis.api.core.Message copy(long newID) { + checkBuffer(); + return copy().setMessageID(newID); + } + + @Override + public long getMessageID() { + return messageID; + } + + @Override + public org.apache.activemq.artemis.api.core.Message setMessageID(long id) { + this.messageID = id; + return this; + } + + @Override + public long getExpiration() { + return expiration; + } + + @Override + public AMQPMessage setExpiration(long expiration) { + this.expiration = expiration; + return this; + } + + @Override + public Object getUserID() { + Properties properties = getProperties(); + if (properties != null && properties.getUserId() != null) { + return properties.getUserId(); + } else { + return this; + } + } + + @Override + public org.apache.activemq.artemis.api.core.Message setUserID(Object userID) { + return null; + } + + @Override + public boolean isDurable() { + if (getHeader() != null && getHeader().getDurable() != null) { + return getHeader().getDurable().booleanValue(); + } else { + return false; + } + } + + @Override + public org.apache.activemq.artemis.api.core.Message setDurable(boolean durable) { + return null; + } + + @Override + public String getAddress() { + if (address == null) { + Properties properties = getProtonMessage().getProperties(); + if (properties != null) { + return properties.getTo(); + } else { + return null; + } + } else { + return address; + } + } + + @Override + public AMQPMessage setAddress(String address) { + this.address = address; + return this; + } + + @Override + public AMQPMessage setAddress(SimpleString address) { + return setAddress(address.toString()); + } + + @Override + public SimpleString getAddressSimpleString() { + return SimpleString.toSimpleString(getAddress()); + } + + @Override + public long getTimestamp() { + return 0; + } + + @Override + public org.apache.activemq.artemis.api.core.Message setTimestamp(long timestamp) { + return null; + } + + @Override + public byte getPriority() { + return 0; + } + + @Override + public org.apache.activemq.artemis.api.core.Message setPriority(byte priority) { + return null; + } + + @Override + public void receiveBuffer(ByteBuf buffer) { + + } + + private synchronized void checkBuffer() { + if (!bufferValid) { + ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1500); + try { + getProtonMessage().encode(new NettyWritable(buffer)); + byte[] bytes = new byte[buffer.writerIndex()]; + buffer.readBytes(bytes); + this.data = Unpooled.wrappedBuffer(bytes); + } finally { + buffer.release(); + } + } + } + + @Override + public void sendBuffer(ByteBuf buffer, int deliveryCount) { + checkBuffer(); + Header header = getHeader(); + if (header == null && deliveryCount > 0) { + header = new Header(); + } + if (header != null) { + synchronized (header) { + header.setDeliveryCount(UnsignedInteger.valueOf(deliveryCount - 1)); + TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buffer)); + TLSEncode.getEncoder().writeObject(header); + TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null); + } + } + buffer.writeBytes(data, sendFrom, data.writerIndex() - sendFrom); + } + + @Override + public org.apache.activemq.artemis.api.core.Message putBooleanProperty(String key, boolean value) { + getApplicationPropertiesMap().put(key, Boolean.valueOf(value)); + return this; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putByteProperty(String key, byte value) { + getApplicationPropertiesMap().put(key, Byte.valueOf(value)); + return this; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putBytesProperty(String key, byte[] value) { + getApplicationPropertiesMap().put(key, value); + return this; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putShortProperty(String key, short value) { + getApplicationPropertiesMap().put(key, Short.valueOf(value)); + return this; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putCharProperty(String key, char value) { + getApplicationPropertiesMap().put(key, Character.valueOf(value)); + return this; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putIntProperty(String key, int value) { + getApplicationPropertiesMap().put(key, Integer.valueOf(value)); + return this; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putLongProperty(String key, long value) { + getApplicationPropertiesMap().put(key, Long.valueOf(value)); + return this; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putFloatProperty(String key, float value) { + getApplicationPropertiesMap().put(key, Float.valueOf(value)); + return this; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putDoubleProperty(String key, double value) { + getApplicationPropertiesMap().put(key, Double.valueOf(value)); + return this; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putBooleanProperty(SimpleString key, boolean value) { + getApplicationPropertiesMap().put(key, Boolean.valueOf(value)); + return this; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putByteProperty(SimpleString key, byte value) { + return putByteProperty(key.toString(), value); + } + + @Override + public org.apache.activemq.artemis.api.core.Message putBytesProperty(SimpleString key, byte[] value) { + return putBytesProperty(key.toString(), value); + } + + @Override + public org.apache.activemq.artemis.api.core.Message putShortProperty(SimpleString key, short value) { + return putShortProperty(key.toString(), value); + } + + @Override + public org.apache.activemq.artemis.api.core.Message putCharProperty(SimpleString key, char value) { + return putCharProperty(key.toString(), value); + } + + @Override + public org.apache.activemq.artemis.api.core.Message putIntProperty(SimpleString key, int value) { + return putIntProperty(key.toString(), value); + } + + @Override + public org.apache.activemq.artemis.api.core.Message putLongProperty(SimpleString key, long value) { + return putLongProperty(key.toString(), value); + } + + @Override + public org.apache.activemq.artemis.api.core.Message putFloatProperty(SimpleString key, float value) { + return putFloatProperty(key.toString(), value); + } + + @Override + public org.apache.activemq.artemis.api.core.Message putDoubleProperty(SimpleString key, double value) { + return putDoubleProperty(key.toString(), value); + } + + @Override + public org.apache.activemq.artemis.api.core.Message putStringProperty(String key, String value) { + getApplicationPropertiesMap().put(key, value); + return this; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putObjectProperty(String key, + Object value) throws ActiveMQPropertyConversionException { + getApplicationPropertiesMap().put(key, value); + return this; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putObjectProperty(SimpleString key, + Object value) throws ActiveMQPropertyConversionException { + return putObjectProperty(key.toString(), value); + } + + @Override + public Object removeProperty(String key) { + return getApplicationPropertiesMap().remove(key); + } + + @Override + public boolean containsProperty(String key) { + return getApplicationPropertiesMap().containsKey(key); + } + + @Override + public Boolean getBooleanProperty(String key) throws ActiveMQPropertyConversionException { + return (Boolean)getApplicationPropertiesMap().get(key); + } + + @Override + public Byte getByteProperty(String key) throws ActiveMQPropertyConversionException { + return (Byte)getApplicationPropertiesMap().get(key); + } + + @Override + public Double getDoubleProperty(String key) throws ActiveMQPropertyConversionException { + return (Double)getApplicationPropertiesMap().get(key); + } + + @Override + public Integer getIntProperty(String key) throws ActiveMQPropertyConversionException { + return (Integer)getApplicationPropertiesMap().get(key); + } + + @Override + public Long getLongProperty(String key) throws ActiveMQPropertyConversionException { + return (Long)getApplicationPropertiesMap().get(key); + } + + @Override + public Object getObjectProperty(String key) { + if (key.equals("JMSType")) { + return getProperties().getSubject(); + } + + return getApplicationPropertiesMap().get(key); + } + + @Override + public Short getShortProperty(String key) throws ActiveMQPropertyConversionException { + return (Short)getApplicationPropertiesMap().get(key); + } + + @Override + public Float getFloatProperty(String key) throws ActiveMQPropertyConversionException { + return (Float)getApplicationPropertiesMap().get(key); + } + + @Override + public String getStringProperty(String key) throws ActiveMQPropertyConversionException { + if (key.equals("JMSType")) { + return getProperties().getSubject(); + } + return (String)getApplicationPropertiesMap().get(key); + } + + @Override + public boolean containsDeliveryAnnotationProperty(SimpleString key) { + parseHeaders(); + if (_deliveryAnnotations == null || _deliveryAnnotations.getValue() == null) { + return false; + } + return _deliveryAnnotations.getValue().containsKey(key.toString()); + } + + @Override + public Object removeDeliveryAnnoationProperty(SimpleString key) { + parseHeaders(); + if (_deliveryAnnotations == null || _deliveryAnnotations.getValue() == null) { + return null; + } + return _deliveryAnnotations.getValue().remove(key.toString()); + } + + @Override + public Object getDeliveryAnnotationProperty(SimpleString key) { + return null; + } + + @Override + public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException { + return SimpleString.toSimpleString((String)getApplicationPropertiesMap().get(key)); + } + + @Override + public byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException { + return (byte[]) getApplicationPropertiesMap().get(key); + } + + @Override + public Object removeProperty(SimpleString key) { + return removeProperty(key.toString()); + } + + @Override + public boolean containsProperty(SimpleString key) { + return containsProperty(key.toString()); + } + + @Override + public Boolean getBooleanProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return getBooleanProperty(key.toString()); + } + + @Override + public Byte getByteProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return getByteProperty(key.toString()); + } + + @Override + public Double getDoubleProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return getDoubleProperty(key.toString()); + } + + @Override + public Integer getIntProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return getIntProperty(key.toString()); + } + + @Override + public Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return getLongProperty(key.toString()); + } + + @Override + public Object getObjectProperty(SimpleString key) { + return getObjectProperty(key.toString()); + } + + @Override + public Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return getShortProperty(key.toString()); + } + + @Override + public Float getFloatProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return getFloatProperty(key.toString()); + } + + @Override + public String getStringProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return getStringProperty(key.toString()); + } + + @Override + public SimpleString getSimpleStringProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return getSimpleStringProperty(key.toString()); + } + + @Override + public byte[] getBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return getBytesProperty(key.toString()); + } + + @Override + public org.apache.activemq.artemis.api.core.Message putStringProperty(SimpleString key, SimpleString value) { + return putStringProperty(key.toString(), value.toString()); + } + + @Override + public int getEncodeSize() { + return 0; + } + + @Override + public Set<SimpleString> getPropertyNames() { + HashSet<SimpleString> values = new HashSet<>(); + for (Object k : getApplicationPropertiesMap().keySet()) { + values.add(SimpleString.toSimpleString(k.toString())); + } + return values; + } + + @Override + public int getMemoryEstimate() { + if (memoryEstimate == -1) { + memoryEstimate = memoryOffset + + (data != null ? data.capacity() : 0); + } + + return memoryEstimate; + } + + @Override + public ICoreMessage toCore() { + try { + return AMQPConverter.getInstance().toCore(this); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + + @Override + public SimpleString getReplyTo() { + if (getProperties() != null) { + return SimpleString.toSimpleString(getProperties().getReplyTo()); + } else { + return null; + } + + } + + @Override + public AMQPMessage setReplyTo(SimpleString address) { + if (getProperties() != null) { + getProperties().setReplyTo(address != null ? address.toString() : null); + } + return this; + } + + + @Override + public int getPersistSize() { + checkBuffer(); + return data.array().length + DataConstants.SIZE_INT; + } + + @Override + public void persist(ActiveMQBuffer targetRecord) { + checkBuffer(); + targetRecord.writeInt(data.array().length); + targetRecord.writeBytes(data.array()); + } + + @Override + public void reloadPersistence(ActiveMQBuffer record) { + int size = record.readInt(); + byte[] recordArray = new byte[size]; + record.readBytes(recordArray); + this.data = Unpooled.wrappedBuffer(recordArray); + this.bufferValid = true; + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java new file mode 100644 index 0000000..3b5bdda --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.protocol.amqp.broker; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.spi.core.protocol.MessagePersister; +import org.apache.activemq.artemis.utils.DataConstants; + +public class AMQPMessagePersister extends MessagePersister { + + public static AMQPMessagePersister theInstance = new AMQPMessagePersister(); + + public static AMQPMessagePersister getInstance() { + return theInstance; + } + + private AMQPMessagePersister() { + } + + @Override + protected byte getID() { + return ProtonProtocolManagerFactory.ID; + } + + @Override + public int getEncodeSize(Message record) { + return DataConstants.SIZE_BYTE + record.getPersistSize() + + SimpleString.sizeofNullableString(record.getAddressSimpleString()) + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG; + } + + + /** Sub classes must add the first short as the protocol-id */ + @Override + public void encode(ActiveMQBuffer buffer, Message record) { + super.encode(buffer, record); + AMQPMessage msgEncode = (AMQPMessage)record; + buffer.writeLong(record.getMessageID()); + buffer.writeLong(msgEncode.getMessageFormat()); + buffer.writeNullableSimpleString(record.getAddressSimpleString()); + record.persist(buffer); + } + + + @Override + public Message decode(ActiveMQBuffer buffer, Message record) { + long id = buffer.readLong(); + long format = buffer.readLong(); + SimpleString address = buffer.readNullableSimpleString(); + record = new AMQPMessage(format); + record.reloadPersistence(buffer); + record.setMessageID(id); + if (address != null) { + record.setAddress(address); + } + return record; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 18c6b05..5931afe 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -23,6 +23,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.io.IOCallback; @@ -32,16 +34,13 @@ import org.apache.activemq.artemis.core.server.AddressQueryResult; import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.QueueQueryResult; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.ServerConsumer; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.jms.client.ActiveMQConnection; -import org.apache.activemq.artemis.protocol.amqp.converter.ProtonMessageConverter; -import org.apache.activemq.artemis.protocol.amqp.converter.message.EncodedMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException; @@ -65,11 +64,9 @@ import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.ErrorCondition; -import org.apache.qpid.proton.codec.WritableBuffer; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Receiver; -import io.netty.buffer.ByteBuf; import org.jboss.logging.Logger; public class AMQPSessionCallback implements SessionCallback { @@ -298,13 +295,6 @@ public class AMQPSessionCallback implements SessionCallback { } } - public long encodeMessage(Object message, int deliveryCount, WritableBuffer buffer) throws Exception { - ProtonMessageConverter converter = (ProtonMessageConverter) manager.getConverter(); - - // The Proton variant accepts a WritableBuffer to allow for a faster more direct encode. - return (long) converter.outbound((ServerMessage) message, deliveryCount, buffer); - } - public String tempQueueName() { return UUIDGenerator.getInstance().generateStringUUID(); } @@ -321,22 +311,22 @@ public class AMQPSessionCallback implements SessionCallback { } } - public void ack(Transaction transaction, Object brokerConsumer, Object message) throws Exception { + public void ack(Transaction transaction, Object brokerConsumer, Message message) throws Exception { if (transaction == null) { transaction = serverSession.getCurrentTransaction(); } recoverContext(); try { - ((ServerConsumer) brokerConsumer).individualAcknowledge(transaction, ((ServerMessage) message).getMessageID()); + ((ServerConsumer) brokerConsumer).individualAcknowledge(transaction, message.getMessageID()); } finally { resetContext(); } } - public void cancel(Object brokerConsumer, Object message, boolean updateCounts) throws Exception { + public void cancel(Object brokerConsumer, Message message, boolean updateCounts) throws Exception { recoverContext(); try { - ((ServerConsumer) brokerConsumer).individualCancel(((ServerMessage) message).getMessageID(), updateCounts); + ((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts); } finally { resetContext(); } @@ -351,11 +341,8 @@ public class AMQPSessionCallback implements SessionCallback { final Delivery delivery, String address, int messageFormat, - ByteBuf messageEncoded) throws Exception { - EncodedMessage encodedMessage = new EncodedMessage(messageFormat, messageEncoded.array(), messageEncoded.arrayOffset(), messageEncoded.writerIndex()); - - ServerMessage message = manager.getConverter().inbound(encodedMessage); - //use the address on the receiver if not null, if null let's hope it was set correctly on the message + byte[] data) throws Exception { + AMQPMessage message = new AMQPMessage(messageFormat, data); if (address != null) { message.setAddress(new SimpleString(address)); } else { @@ -372,7 +359,7 @@ public class AMQPSessionCallback implements SessionCallback { recoverContext(); - PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddress()); + PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddressSimpleString()); if (store.isRejectingMessages()) { // We drop pre-settled messages (and abort any associated Tx) if (delivery.remotelySettled()) { @@ -401,12 +388,12 @@ public class AMQPSessionCallback implements SessionCallback { } private void serverSend(final Transaction transaction, - final ServerMessage message, + final Message message, final Delivery delivery, final Receiver receiver) throws Exception { try { - message.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString(), receiver.getSession().getConnection().getRemoteContainer()); +// message.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString(), receiver.getSession().getConnection().getRemoteContainer()); serverSession.send(transaction, message, false, false); // FIXME Potential race here... @@ -416,8 +403,8 @@ public class AMQPSessionCallback implements SessionCallback { synchronized (connection.getLock()) { delivery.disposition(Accepted.getInstance()); delivery.settle(); - connection.flush(); } + connection.flush(true); } @Override @@ -492,14 +479,14 @@ public class AMQPSessionCallback implements SessionCallback { } @Override - public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) { + public int sendMessage(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) { message.removeProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString()); ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext(); try { - return plugSender.deliverMessage(message, deliveryCount); + return plugSender.deliverMessage(CoreAmqpConverter.checkAMQP(message), deliveryCount); } catch (Exception e) { synchronized (connection.getLock()) { plugSender.getSender().setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage())); @@ -512,7 +499,7 @@ public class AMQPSessionCallback implements SessionCallback { @Override public int sendLargeMessage(MessageReference ref, - ServerMessage message, + Message message, ServerConsumer consumer, long bodySize, int deliveryCount) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java index 754172a..9c7d24d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java @@ -26,19 +26,17 @@ import io.netty.channel.ChannelPipeline; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.NotificationListener; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; -import org.apache.activemq.artemis.protocol.amqp.converter.ProtonMessageConverter; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; -import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -54,8 +52,6 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti private final ActiveMQServer server; - private MessageConverter protonConverter; - private final ProtonProtocolManagerFactory factory; private final Map<SimpleString, RoutingType> prefixes = new HashMap<>(); @@ -72,7 +68,6 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti public ProtonProtocolManager(ProtonProtocolManagerFactory factory, ActiveMQServer server) { this.factory = factory; this.server = server; - this.protonConverter = new ProtonMessageConverter(server.getStorageManager()); } public ActiveMQServer getServer() { @@ -80,11 +75,6 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti } @Override - public MessageConverter getConverter() { - return protonConverter; - } - - @Override public void onNotification(Notification notification) { } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java index bef8ef0..98ec228 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java @@ -22,6 +22,8 @@ import java.util.Map; import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; @@ -32,6 +34,8 @@ import org.osgi.service.component.annotations.Component; @Component(service = ProtocolManagerFactory.class) public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory<Interceptor> { + public static final byte ID = 2; + private static final String AMQP_PROTOCOL_NAME = "AMQP"; private static final String MODULE_NAME = "artemis-amqp-protocol"; @@ -39,6 +43,16 @@ public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory private static String[] SUPPORTED_PROTOCOLS = {AMQP_PROTOCOL_NAME}; @Override + public byte getStoreID() { + return ID; + } + + @Override + public Persister<Message> getPersister() { + return AMQPMessagePersister.getInstance(); + } + + @Override public ProtocolManager createProtocolManager(ActiveMQServer server, final Map<String, Object> parameters, List<BaseInterceptor> incomingInterceptors, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPContentTypeSupport.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPContentTypeSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPContentTypeSupport.java new file mode 100644 index 0000000..e040138 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPContentTypeSupport.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.converter; + +import java.nio.charset.Charset; +import java.nio.charset.IllegalCharsetNameException; +import java.nio.charset.StandardCharsets; +import java.nio.charset.UnsupportedCharsetException; +import java.util.StringTokenizer; + +import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInvalidContentTypeException; + +public final class AMQPContentTypeSupport { + + private static final String UTF_8 = "UTF-8"; + private static final String CHARSET = "charset"; + private static final String TEXT = "text"; + private static final String APPLICATION = "application"; + private static final String JAVASCRIPT = "javascript"; + private static final String XML = "xml"; + private static final String XML_VARIANT = "+xml"; + private static final String JSON = "json"; + private static final String JSON_VARIANT = "+json"; + private static final String XML_DTD = "xml-dtd"; + private static final String ECMASCRIPT = "ecmascript"; + + /** + * @param contentType + * the contentType of the received message + * @return the character set to use, or null if not to treat the message as text + * @throws ActiveMQAMQPInvalidContentTypeException + * if the content-type is invalid in some way. + */ + public static Charset parseContentTypeForTextualCharset(final String contentType) throws ActiveMQAMQPInvalidContentTypeException { + if (contentType == null || contentType.trim().isEmpty()) { + throw new ActiveMQAMQPInvalidContentTypeException("Content type can't be null or empty"); + } + + int subTypeSeparator = contentType.indexOf("/"); + if (subTypeSeparator == -1) { + throw new ActiveMQAMQPInvalidContentTypeException("Content type has no '/' separator: " + contentType); + } + + final String type = contentType.substring(0, subTypeSeparator).toLowerCase().trim(); + + String subTypePart = contentType.substring(subTypeSeparator + 1).toLowerCase().trim(); + + String parameterPart = null; + int parameterSeparator = subTypePart.indexOf(";"); + if (parameterSeparator != -1) { + if (parameterSeparator < subTypePart.length() - 1) { + parameterPart = contentType.substring(subTypeSeparator + 1).toLowerCase().trim(); + } + subTypePart = subTypePart.substring(0, parameterSeparator).trim(); + } + + if (subTypePart.isEmpty()) { + throw new ActiveMQAMQPInvalidContentTypeException("Content type has no subtype after '/'" + contentType); + } + + final String subType = subTypePart; + + if (isTextual(type, subType)) { + String charset = findCharset(parameterPart); + if (charset == null) { + charset = UTF_8; + } + + if (UTF_8.equals(charset)) { + return StandardCharsets.UTF_8; + } else { + try { + return Charset.forName(charset); + } catch (IllegalCharsetNameException icne) { + throw new ActiveMQAMQPInvalidContentTypeException("Illegal charset: " + charset); + } catch (UnsupportedCharsetException uce) { + throw new ActiveMQAMQPInvalidContentTypeException("Unsupported charset: " + charset); + } + } + } + + return null; + } + + // ----- Internal Content Type utilities ----------------------------------// + + private static boolean isTextual(String type, String subType) { + if (TEXT.equals(type)) { + return true; + } + + if (APPLICATION.equals(type)) { + if (XML.equals(subType) || JSON.equals(subType) || JAVASCRIPT.equals(subType) || subType.endsWith(XML_VARIANT) || subType.endsWith(JSON_VARIANT) + || XML_DTD.equals(subType) || ECMASCRIPT.equals(subType)) { + return true; + } + } + + return false; + } + + private static String findCharset(String paramaterPart) { + String charset = null; + + if (paramaterPart != null) { + StringTokenizer tokenizer = new StringTokenizer(paramaterPart, ";"); + while (tokenizer.hasMoreTokens()) { + String parameter = tokenizer.nextToken().trim(); + int eqIndex = parameter.indexOf('='); + if (eqIndex != -1) { + String name = parameter.substring(0, eqIndex); + if (CHARSET.equalsIgnoreCase(name.trim())) { + String value = unquote(parameter.substring(eqIndex + 1)); + + charset = value.toUpperCase(); + break; + } + } + } + } + + return charset; + } + + private static String unquote(String s) { + if (s.length() > 1 && (s.startsWith("\"") && s.endsWith("\""))) { + return s.substring(1, s.length() - 1); + } else { + return s; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java new file mode 100644 index 0000000..724474b --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.converter; + +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; +import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; + + +public class AMQPConverter implements MessageConverter<AMQPMessage> { + + private static final AMQPConverter theInstance = new AMQPConverter(); + + private AMQPConverter() { + } + + public static AMQPConverter getInstance() { + return theInstance; + } + + @Override + public AMQPMessage fromCore(ICoreMessage coreMessage) throws Exception { + return CoreAmqpConverter.fromCore(coreMessage); + } + + @Override + public ICoreMessage toCore(AMQPMessage messageSource) throws Exception { + return AmqpCoreConverter.toCore(messageSource); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageIdHelper.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageIdHelper.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageIdHelper.java new file mode 100644 index 0000000..00282e0 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageIdHelper.java @@ -0,0 +1,252 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.activemq.artemis.protocol.amqp.converter; + +import java.nio.ByteBuffer; +import java.util.UUID; + +import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.UnsignedLong; + +/** + * Helper class for identifying and converting message-id and correlation-id values between the + * AMQP types and the Strings values used by JMS. + * <p> + * AMQP messages allow for 4 types of message-id/correlation-id: message-id-string, + * message-id-binary, message-id-uuid, or message-id-ulong. In order to accept or return a + * string representation of these for interoperability with other AMQP clients, the following + * encoding can be used after removing or before adding the "ID:" prefix used for a JMSMessageID + * value:<br> + * <p> + * {@literal "AMQP_BINARY:<hex representation of binary content>"}<br> + * {@literal "AMQP_UUID:<string representation of uuid>"}<br> + * {@literal "AMQP_ULONG:<string representation of ulong>"}<br> + * {@literal "AMQP_STRING:<string>"}<br> + * <p> + * The AMQP_STRING encoding exists only for escaping message-id-string values that happen to + * begin with one of the encoding prefixes (including AMQP_STRING itself). It MUST NOT be used + * otherwise. + * <p> + * When provided a string for conversion which attempts to identify itself as an encoded binary, + * uuid, or ulong but can't be converted into the indicated format, an exception will be thrown. + */ +public class AMQPMessageIdHelper { + + public static final AMQPMessageIdHelper INSTANCE = new AMQPMessageIdHelper(); + + public static final String AMQP_STRING_PREFIX = "AMQP_STRING:"; + public static final String AMQP_UUID_PREFIX = "AMQP_UUID:"; + public static final String AMQP_ULONG_PREFIX = "AMQP_ULONG:"; + public static final String AMQP_BINARY_PREFIX = "AMQP_BINARY:"; + + private static final int AMQP_UUID_PREFIX_LENGTH = AMQP_UUID_PREFIX.length(); + private static final int AMQP_ULONG_PREFIX_LENGTH = AMQP_ULONG_PREFIX.length(); + private static final int AMQP_STRING_PREFIX_LENGTH = AMQP_STRING_PREFIX.length(); + private static final int AMQP_BINARY_PREFIX_LENGTH = AMQP_BINARY_PREFIX.length(); + private static final char[] HEX_CHARS = "0123456789ABCDEF".toCharArray(); + + /** + * Takes the provided AMQP messageId style object, and convert it to a base string. Encodes + * type information as a prefix where necessary to convey or escape the type of the provided + * object. + * + * @param messageId + * the raw messageId object to process + * @return the base string to be used in creating the actual id. + */ + public String toBaseMessageIdString(Object messageId) { + if (messageId == null) { + return null; + } else if (messageId instanceof String) { + String stringId = (String) messageId; + + // If the given string has a type encoding prefix, + // we need to escape it as an encoded string (even if + // the existing encoding prefix was also for string) + if (hasTypeEncodingPrefix(stringId)) { + return AMQP_STRING_PREFIX + stringId; + } else { + return stringId; + } + } else if (messageId instanceof UUID) { + return AMQP_UUID_PREFIX + messageId.toString(); + } else if (messageId instanceof UnsignedLong) { + return AMQP_ULONG_PREFIX + messageId.toString(); + } else if (messageId instanceof Binary) { + ByteBuffer dup = ((Binary) messageId).asByteBuffer(); + + byte[] bytes = new byte[dup.remaining()]; + dup.get(bytes); + + String hex = convertBinaryToHexString(bytes); + + return AMQP_BINARY_PREFIX + hex; + } else { + throw new IllegalArgumentException("Unsupported type provided: " + messageId.getClass()); + } + } + + /** + * Takes the provided base id string and return the appropriate amqp messageId style object. + * Converts the type based on any relevant encoding information found as a prefix. + * + * @param baseId + * the object to be converted to an AMQP MessageId value. + * @return the AMQP messageId style object + * @throws ActiveMQAMQPIllegalStateException + * if the provided baseId String indicates an encoded type but can't be converted to + * that type. + */ + public Object toIdObject(String baseId) throws ActiveMQAMQPIllegalStateException { + if (baseId == null) { + return null; + } + + try { + if (hasAmqpUuidPrefix(baseId)) { + String uuidString = strip(baseId, AMQP_UUID_PREFIX_LENGTH); + return UUID.fromString(uuidString); + } else if (hasAmqpUlongPrefix(baseId)) { + String longString = strip(baseId, AMQP_ULONG_PREFIX_LENGTH); + return UnsignedLong.valueOf(longString); + } else if (hasAmqpStringPrefix(baseId)) { + return strip(baseId, AMQP_STRING_PREFIX_LENGTH); + } else if (hasAmqpBinaryPrefix(baseId)) { + String hexString = strip(baseId, AMQP_BINARY_PREFIX_LENGTH); + byte[] bytes = convertHexStringToBinary(hexString); + return new Binary(bytes); + } else { + // We have a string without any type prefix, transmit it as-is. + return baseId; + } + } catch (IllegalArgumentException e) { + throw new ActiveMQAMQPIllegalStateException("Unable to convert ID value"); + } + } + + /** + * Convert the provided hex-string into a binary representation where each byte represents + * two characters of the hex string. + * <p> + * The hex characters may be upper or lower case. + * + * @param hexString + * string to convert to a binary value. + * @return a byte array containing the binary representation + * @throws IllegalArgumentException + * if the provided String is a non-even length or contains non-hex characters + */ + public byte[] convertHexStringToBinary(String hexString) throws IllegalArgumentException { + int length = hexString.length(); + + // As each byte needs two characters in the hex encoding, the string must be an even + // length. + if (length % 2 != 0) { + throw new IllegalArgumentException("The provided hex String must be an even length, but was of length " + length + ": " + hexString); + } + + byte[] binary = new byte[length / 2]; + + for (int i = 0; i < length; i += 2) { + char highBitsChar = hexString.charAt(i); + char lowBitsChar = hexString.charAt(i + 1); + + int highBits = hexCharToInt(highBitsChar, hexString) << 4; + int lowBits = hexCharToInt(lowBitsChar, hexString); + + binary[i / 2] = (byte) (highBits + lowBits); + } + + return binary; + } + + /** + * Convert the provided binary into a hex-string representation where each character + * represents 4 bits of the provided binary, i.e each byte requires two characters. + * <p> + * The returned hex characters are upper-case. + * + * @param bytes + * the binary value to convert to a hex String instance. + * @return a String containing a hex representation of the bytes + */ + public String convertBinaryToHexString(byte[] bytes) { + // Each byte is represented as 2 chars + StringBuilder builder = new StringBuilder(bytes.length * 2); + + for (byte b : bytes) { + // The byte will be expanded to int before shifting, replicating the + // sign bit, so mask everything beyond the first 4 bits afterwards + int highBitsInt = (b >> 4) & 0xF; + // We only want the first 4 bits + int lowBitsInt = b & 0xF; + + builder.append(HEX_CHARS[highBitsInt]); + builder.append(HEX_CHARS[lowBitsInt]); + } + + return builder.toString(); + } + + // ----- Internal implementation ------------------------------------------// + + private boolean hasTypeEncodingPrefix(String stringId) { + return hasAmqpBinaryPrefix(stringId) || hasAmqpUuidPrefix(stringId) || hasAmqpUlongPrefix(stringId) || hasAmqpStringPrefix(stringId); + } + + private boolean hasAmqpStringPrefix(String stringId) { + return stringId.startsWith(AMQP_STRING_PREFIX); + } + + private boolean hasAmqpUlongPrefix(String stringId) { + return stringId.startsWith(AMQP_ULONG_PREFIX); + } + + private boolean hasAmqpUuidPrefix(String stringId) { + return stringId.startsWith(AMQP_UUID_PREFIX); + } + + private boolean hasAmqpBinaryPrefix(String stringId) { + return stringId.startsWith(AMQP_BINARY_PREFIX); + } + + private String strip(String id, int numChars) { + return id.substring(numChars); + } + + private int hexCharToInt(char ch, String orig) throws IllegalArgumentException { + if (ch >= '0' && ch <= '9') { + // subtract '0' to get difference in position as an int + return ch - '0'; + } else if (ch >= 'A' && ch <= 'F') { + // subtract 'A' to get difference in position as an int + // and then add 10 for the offset of 'A' + return ch - 'A' + 10; + } else if (ch >= 'a' && ch <= 'f') { + // subtract 'a' to get difference in position as an int + // and then add 10 for the offset of 'a' + return ch - 'a' + 10; + } + + throw new IllegalArgumentException("The provided hex string contains non-hex character '" + ch + "': " + orig); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java new file mode 100644 index 0000000..0dd54db --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.converter; + +import javax.jms.Destination; +import javax.jms.JMSException; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Map; +import java.util.Set; + +import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.jms.client.ActiveMQDestination; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage; +import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInvalidContentTypeException; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.message.Message; + +import static org.apache.activemq.artemis.api.core.Message.BYTES_TYPE; +import static org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE; +import static org.apache.activemq.artemis.api.core.Message.MAP_TYPE; +import static org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE; +import static org.apache.activemq.artemis.api.core.Message.STREAM_TYPE; +import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE; + +/** + * Support class containing constant values and static methods that are used to map to / from + * AMQP Message types being sent or received. + */ +public final class AMQPMessageSupport { + + public static final String JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME = "x-opt-jms-reply-to"; + + // Message Properties used to map AMQP to JMS and back + /** + * Attribute used to mark the class type of JMS message that a particular message + * instance represents, used internally by the client. + */ + public static final Symbol JMS_MSG_TYPE = Symbol.getSymbol("x-opt-jms-msg-type"); + + /** + * Attribute used to mark the Application defined delivery time assigned to the message + */ + public static final Symbol JMS_DELIVERY_TIME = Symbol.getSymbol("x-opt-delivery-time"); + + /** + * Value mapping for JMS_MSG_TYPE which indicates the message is a generic JMS Message + * which has no body. + */ + public static final byte JMS_MESSAGE = 0; + + /** + * Value mapping for JMS_MSG_TYPE which indicates the message is a JMS ObjectMessage + * which has an Object value serialized in its message body. + */ + public static final byte JMS_OBJECT_MESSAGE = 1; + + /** + * Value mapping for JMS_MSG_TYPE which indicates the message is a JMS MapMessage + * which has an Map instance serialized in its message body. + */ + public static final byte JMS_MAP_MESSAGE = 2; + + /** + * Value mapping for JMS_MSG_TYPE which indicates the message is a JMS BytesMessage + * which has a body that consists of raw bytes. + */ + public static final byte JMS_BYTES_MESSAGE = 3; + + /** + * Value mapping for JMS_MSG_TYPE which indicates the message is a JMS StreamMessage + * which has a body that is a structured collection of primitives values. + */ + public static final byte JMS_STREAM_MESSAGE = 4; + + /** + * Value mapping for JMS_MSG_TYPE which indicates the message is a JMS TextMessage + * which has a body that contains a UTF-8 encoded String. + */ + public static final byte JMS_TEXT_MESSAGE = 5; + + + /** + * Content type used to mark Data sections as containing a serialized java object. + */ + public static final Symbol SERIALIZED_JAVA_OBJECT_CONTENT_TYPE = Symbol.getSymbol("application/x-java-serialized-object"); + + public static final String JMS_AMQP_PREFIX = "JMS_AMQP_"; + public static final int JMS_AMQP_PREFIX_LENGTH = JMS_AMQP_PREFIX.length(); + + public static final String NATIVE = "NATIVE"; + public static final String HEADER = "HEADER"; + public static final String PROPERTIES = "PROPERTIES"; + + public static final String FIRST_ACQUIRER = "FirstAcquirer"; + public static final String CONTENT_TYPE = "ContentType"; + public static final String CONTENT_ENCODING = "ContentEncoding"; + public static final String REPLYTO_GROUP_ID = "ReplyToGroupID"; + public static final String DURABLE = "DURABLE"; + public static final String PRIORITY = "PRIORITY"; + + public static final String DELIVERY_ANNOTATION_PREFIX = "DA_"; + public static final String MESSAGE_ANNOTATION_PREFIX = "MA_"; + public static final String FOOTER_PREFIX = "FT_"; + + public static final String JMS_AMQP_HEADER = JMS_AMQP_PREFIX + HEADER; + public static final String JMS_AMQP_HEADER_DURABLE = JMS_AMQP_PREFIX + HEADER + DURABLE; + public static final String JMS_AMQP_HEADER_PRIORITY = JMS_AMQP_PREFIX + HEADER + PRIORITY; + public static final String JMS_AMQP_PROPERTIES = JMS_AMQP_PREFIX + PROPERTIES; + public static final String JMS_AMQP_NATIVE = JMS_AMQP_PREFIX + NATIVE; + public static final String JMS_AMQP_FIRST_ACQUIRER = JMS_AMQP_PREFIX + FIRST_ACQUIRER; + public static final String JMS_AMQP_CONTENT_TYPE = JMS_AMQP_PREFIX + CONTENT_TYPE; + public static final String JMS_AMQP_CONTENT_ENCODING = JMS_AMQP_PREFIX + CONTENT_ENCODING; + public static final String JMS_AMQP_REPLYTO_GROUP_ID = JMS_AMQP_PREFIX + REPLYTO_GROUP_ID; + public static final String JMS_AMQP_DELIVERY_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + DELIVERY_ANNOTATION_PREFIX; + public static final String JMS_AMQP_MESSAGE_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + MESSAGE_ANNOTATION_PREFIX; + public static final String JMS_AMQP_FOOTER_PREFIX = JMS_AMQP_PREFIX + FOOTER_PREFIX; + + // Message body type definitions + public static final Binary EMPTY_BINARY = new Binary(new byte[0]); + public static final Data EMPTY_BODY = new Data(EMPTY_BINARY); + + public static final short AMQP_UNKNOWN = 0; + public static final short AMQP_NULL = 1; + public static final short AMQP_DATA = 2; + public static final short AMQP_SEQUENCE = 3; + public static final short AMQP_VALUE_NULL = 4; + public static final short AMQP_VALUE_STRING = 5; + public static final short AMQP_VALUE_BINARY = 6; + public static final short AMQP_VALUE_MAP = 7; + public static final short AMQP_VALUE_LIST = 8; + + public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = getSymbol("x-opt-jms-dest"); + public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = getSymbol("x-opt-jms-reply-to"); + + public static final byte QUEUE_TYPE = 0x00; + public static final byte TOPIC_TYPE = 0x01; + public static final byte TEMP_QUEUE_TYPE = 0x02; + public static final byte TEMP_TOPIC_TYPE = 0x03; + + /** + * Content type used to mark Data sections as containing arbitrary bytes. + */ + public static final String OCTET_STREAM_CONTENT_TYPE = "application/octet-stream"; + + /** + * Lookup and return the correct Proton Symbol instance based on the given key. + * + * @param key + * the String value name of the Symbol to locate. + * + * @return the Symbol value that matches the given key. + */ + public static Symbol getSymbol(String key) { + return Symbol.valueOf(key); + } + + /** + * Safe way to access message annotations which will check internal structure and either + * return the annotation if it exists or null if the annotation or any annotations are + * present. + * + * @param key + * the String key to use to lookup an annotation. + * @param message + * the AMQP message object that is being examined. + * + * @return the given annotation value or null if not present in the message. + */ + public static Object getMessageAnnotation(String key, Message message) { + if (message != null && message.getMessageAnnotations() != null) { + Map<Symbol, Object> annotations = message.getMessageAnnotations().getValue(); + return annotations.get(AMQPMessageSupport.getSymbol(key)); + } + + return null; + } + + /** + * Check whether the content-type field of the properties section (if present) in the given + * message matches the provided string (where null matches if there is no content type + * present. + * + * @param contentType + * content type string to compare against, or null if none + * @param message + * the AMQP message object that is being examined. + * + * @return true if content type matches + */ + public static boolean isContentType(String contentType, Message message) { + if (contentType == null) { + return message.getContentType() == null; + } else { + return contentType.equals(message.getContentType()); + } + } + + /** + * @param contentType + * the contentType of the received message + * @return the character set to use, or null if not to treat the message as text + */ + public static Charset getCharsetForTextualContent(String contentType) { + try { + return AMQPContentTypeSupport.parseContentTypeForTextualCharset(contentType); + } catch (ActiveMQAMQPInvalidContentTypeException e) { + return null; + } + } + + public static String toAddress(Destination destination) { + if (destination instanceof ActiveMQDestination) { + return ((ActiveMQDestination) destination).getAddress(); + } + return null; + } + + public static ServerJMSBytesMessage createBytesMessage(long id) { + return new ServerJMSBytesMessage(newMessage(id, BYTES_TYPE)); + } + + public static ServerJMSBytesMessage createBytesMessage(long id, byte[] array, int arrayOffset, int length) throws JMSException { + ServerJMSBytesMessage message = createBytesMessage(id); + message.writeBytes(array, arrayOffset, length); + return message; + } + + public static ServerJMSStreamMessage createStreamMessage(long id) { + return new ServerJMSStreamMessage(newMessage(id, STREAM_TYPE)); + } + + public static ServerJMSMessage createMessage(long id) { + return new ServerJMSMessage(newMessage(id, DEFAULT_TYPE)); + } + + public static ServerJMSTextMessage createTextMessage(long id) { + return new ServerJMSTextMessage(newMessage(id, TEXT_TYPE)); + } + + public static ServerJMSTextMessage createTextMessage(long id, String text) throws JMSException { + ServerJMSTextMessage message = createTextMessage(id); + message.setText(text); + return message; + } + + public static ServerJMSObjectMessage createObjectMessage(long id) { + return new ServerJMSObjectMessage(newMessage(id, OBJECT_TYPE)); + } + + public static ServerJMSMessage createObjectMessage(long id, Binary serializedForm) throws JMSException { + ServerJMSObjectMessage message = createObjectMessage(id); + message.setSerializedForm(serializedForm); + return message; + } + + public static ServerJMSMessage createObjectMessage(long id, byte[] array, int offset, int length) throws JMSException { + ServerJMSObjectMessage message = createObjectMessage(id); + message.setSerializedForm(new Binary(array, offset, length)); + return message; + } + + public static ServerJMSMapMessage createMapMessage(long id) { + return new ServerJMSMapMessage(newMessage(id, MAP_TYPE)); + } + + public static ServerJMSMapMessage createMapMessage(long id, Map<String, Object> content) throws JMSException { + ServerJMSMapMessage message = createMapMessage(id); + final Set<Map.Entry<String, Object>> set = content.entrySet(); + for (Map.Entry<String, Object> entry : set) { + Object value = entry.getValue(); + if (value instanceof Binary) { + Binary binary = (Binary) value; + value = Arrays.copyOfRange(binary.getArray(), binary.getArrayOffset(), binary.getLength()); + } + message.setObject(entry.getKey(), value); + } + return message; + } + + private static CoreMessage newMessage(long id, byte messageType) { + CoreMessage message = new CoreMessage(id, 512); + message.setType(messageType); +// ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null); + return message; + } +}
