http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java deleted file mode 100644 index f93086c..0000000 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java +++ /dev/null @@ -1,1059 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.message.impl; - -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQBuffers; -import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; -import org.apache.activemq.artemis.api.core.Message; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; -import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer; -import org.apache.activemq.artemis.core.message.BodyEncoder; -import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; -import org.apache.activemq.artemis.utils.ByteUtil; -import org.apache.activemq.artemis.utils.DataConstants; -import org.apache.activemq.artemis.utils.TypedProperties; -import org.apache.activemq.artemis.utils.UUID; - -/** - * A concrete implementation of a message - * <p> - * All messages handled by ActiveMQ Artemis core are of this type - */ -public abstract class MessageImpl implements MessageInternal { - - public static final SimpleString HDR_ROUTE_TO_IDS = new SimpleString("_AMQ_ROUTE_TO"); - - public static final SimpleString HDR_SCALEDOWN_TO_IDS = new SimpleString("_AMQ_SCALEDOWN_TO"); - - public static final SimpleString HDR_ROUTE_TO_ACK_IDS = new SimpleString("_AMQ_ACK_ROUTE_TO"); - - // used by the bridges to set duplicates - public static final SimpleString HDR_BRIDGE_DUPLICATE_ID = new SimpleString("_AMQ_BRIDGE_DUP"); - - public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE; - - public static final int BODY_OFFSET = BUFFER_HEADER_SPACE + DataConstants.SIZE_INT; - - protected long messageID; - - protected SimpleString address; - - protected byte type; - - protected boolean durable; - - /** - * GMT milliseconds at which this message expires. 0 means never expires * - */ - private long expiration; - - protected long timestamp; - - protected TypedProperties properties; - - protected byte priority; - - protected volatile ActiveMQBuffer buffer; - - protected volatile ResetLimitWrappedActiveMQBuffer bodyBuffer; - - protected volatile boolean bufferValid; - - private int endOfBodyPosition = -1; - - private int endOfMessagePosition; - - private UUID userID; - - // Constructors -------------------------------------------------- - - protected MessageImpl() { - properties = new TypedProperties(); - } - - /** - * overridden by the client message, we need access to the connection so we can create the appropriate ActiveMQBuffer. - * - * @param type - * @param durable - * @param expiration - * @param timestamp - * @param priority - * @param initialMessageBufferSize - */ - protected MessageImpl(final byte type, - final boolean durable, - final long expiration, - final long timestamp, - final byte priority, - final int initialMessageBufferSize) { - this(); - this.type = type; - this.durable = durable; - this.expiration = expiration; - this.timestamp = timestamp; - this.priority = priority; - createBody(initialMessageBufferSize); - } - - protected MessageImpl(final int initialMessageBufferSize) { - this(); - createBody(initialMessageBufferSize); - } - - /* - * Copy constructor - */ - protected MessageImpl(final MessageImpl other) { - this(other, other.getProperties()); - } - - /* - * Copy constructor - */ - protected MessageImpl(final MessageImpl other, TypedProperties properties) { - messageID = other.getMessageID(); - userID = other.getUserID(); - address = other.getAddress(); - type = other.getType(); - durable = other.isDurable(); - expiration = other.getExpiration(); - timestamp = other.getTimestamp(); - priority = other.getPriority(); - this.properties = new TypedProperties(properties); - - // This MUST be synchronized using the monitor on the other message to prevent it running concurrently - // with getEncodedBuffer(), otherwise can introduce race condition when delivering concurrently to - // many subscriptions and bridging to other nodes in a cluster - synchronized (other) { - bufferValid = false; - endOfBodyPosition = -1; - endOfMessagePosition = other.endOfMessagePosition; - - if (other.buffer != null) { - // We need to copy the underlying buffer too, since the different messsages thereafter might have different - // properties set on them, making their encoding different - buffer = other.buffer.copy(0, other.buffer.capacity()); - - buffer.setIndex(other.buffer.readerIndex(), buffer.capacity()); - - bodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer, this); - - bodyBuffer.readerIndex(BODY_OFFSET); - bodyBuffer.writerIndex(other.getBodyBuffer().writerIndex()); - endOfBodyPosition = other.endOfBodyPosition; - } - } - } - - // Message implementation ---------------------------------------- - - @Override - public int getEncodeSize() { - int headersPropsSize = getHeadersAndPropertiesEncodeSize(); - - int bodyPos = getEndOfBodyPosition(); - - int bodySize = bodyPos - BUFFER_HEADER_SPACE - DataConstants.SIZE_INT; - - return DataConstants.SIZE_INT + bodySize + DataConstants.SIZE_INT + headersPropsSize; - } - - @Override - public int getHeadersAndPropertiesEncodeSize() { - return DataConstants.SIZE_LONG + // Message ID - DataConstants.SIZE_BYTE + // user id null? - (userID == null ? 0 : 16) + - /* address */SimpleString.sizeofNullableString(address) + - DataConstants./* Type */SIZE_BYTE + - DataConstants./* Durable */SIZE_BOOLEAN + - DataConstants./* Expiration */SIZE_LONG + - DataConstants./* Timestamp */SIZE_LONG + - DataConstants./* Priority */SIZE_BYTE + - /* PropertySize and Properties */properties.getEncodeSize(); - } - - @Override - public void encodeHeadersAndProperties(final ActiveMQBuffer buffer) { - buffer.writeLong(messageID); - buffer.writeNullableSimpleString(address); - if (userID == null) { - buffer.writeByte(DataConstants.NULL); - } else { - buffer.writeByte(DataConstants.NOT_NULL); - buffer.writeBytes(userID.asBytes()); - } - buffer.writeByte(type); - buffer.writeBoolean(durable); - buffer.writeLong(expiration); - buffer.writeLong(timestamp); - buffer.writeByte(priority); - properties.encode(buffer); - } - - @Override - public void decodeHeadersAndProperties(final ActiveMQBuffer buffer) { - messageID = buffer.readLong(); - address = buffer.readNullableSimpleString(); - if (buffer.readByte() == DataConstants.NOT_NULL) { - byte[] bytes = new byte[16]; - buffer.readBytes(bytes); - userID = new UUID(UUID.TYPE_TIME_BASED, bytes); - } else { - userID = null; - } - type = buffer.readByte(); - durable = buffer.readBoolean(); - expiration = buffer.readLong(); - timestamp = buffer.readLong(); - priority = buffer.readByte(); - properties.decode(buffer); - } - - public void copyHeadersAndProperties(final MessageInternal msg) { - messageID = msg.getMessageID(); - address = msg.getAddress(); - userID = msg.getUserID(); - type = msg.getType(); - durable = msg.isDurable(); - expiration = msg.getExpiration(); - timestamp = msg.getTimestamp(); - priority = msg.getPriority(); - properties = msg.getTypedProperties(); - } - - @Override - public ActiveMQBuffer getBodyBuffer() { - if (bodyBuffer == null) { - bodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer, this); - } - - return bodyBuffer; - } - - @Override - public Message writeBodyBufferBytes(byte[] bytes) { - getBodyBuffer().writeBytes(bytes); - - return this; - } - - @Override - public Message writeBodyBufferString(String string) { - getBodyBuffer().writeString(string); - - return this; - } - - public void checkCompletion() throws ActiveMQException { - // no op on regular messages - } - - @Override - public synchronized ActiveMQBuffer getBodyBufferDuplicate() { - - // Must copy buffer before sending it - - ByteBuf byteBuf = ChannelBufferWrapper.unwrap(getBodyBuffer().byteBuf()); - byteBuf = byteBuf.duplicate(); - byteBuf.readerIndex(getBodyBuffer().readerIndex()); - byteBuf.writerIndex(getBodyBuffer().writerIndex()); - - return new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, byteBuf, null); - } - - @Override - public long getMessageID() { - return messageID; - } - - @Override - public UUID getUserID() { - return userID; - } - - @Override - public MessageImpl setUserID(final UUID userID) { - this.userID = userID; - return this; - } - - /** - * this doesn't need to be synchronized as setAddress is protecting the buffer, - * not the address - */ - @Override - public SimpleString getAddress() { - return address; - } - - /** - * The only reason this is synchronized is because of encoding a message versus invalidating the buffer. - * This synchronization can probably be removed since setAddress is always called from a single thread. - * However I will keep it as it's harmless and it's been well tested - */ - @Override - public Message setAddress(final SimpleString address) { - // This is protecting the buffer - synchronized (this) { - if (this.address != address) { - this.address = address; - - bufferValid = false; - } - } - - return this; - } - - @Override - public byte getType() { - return type; - } - - public void setType(byte type) { - this.type = type; - } - - @Override - public boolean isDurable() { - return durable; - } - - @Override - public MessageImpl setDurable(final boolean durable) { - if (this.durable != durable) { - this.durable = durable; - - bufferValid = false; - } - return this; - } - - @Override - public long getExpiration() { - return expiration; - } - - @Override - public MessageImpl setExpiration(final long expiration) { - if (this.expiration != expiration) { - this.expiration = expiration; - - bufferValid = false; - } - return this; - } - - @Override - public long getTimestamp() { - return timestamp; - } - - @Override - public MessageImpl setTimestamp(final long timestamp) { - if (this.timestamp != timestamp) { - this.timestamp = timestamp; - - bufferValid = false; - } - return this; - } - - @Override - public byte getPriority() { - return priority; - } - - @Override - public MessageImpl setPriority(final byte priority) { - if (this.priority != priority) { - this.priority = priority; - - bufferValid = false; - } - return this; - } - - @Override - public boolean isExpired() { - if (expiration == 0) { - return false; - } - - return System.currentTimeMillis() - expiration >= 0; - } - - @Override - public Map<String, Object> toMap() { - Map<String, Object> map = new HashMap<>(); - - map.put("messageID", messageID); - if (userID != null) { - map.put("userID", "ID:" + userID.toString()); - } - map.put("address", address.toString()); - map.put("type", type); - map.put("durable", durable); - map.put("expiration", expiration); - map.put("timestamp", timestamp); - map.put("priority", priority); - map.putAll(toPropertyMap()); - return map; - } - - @Override - public Map<String, Object> toPropertyMap() { - Map<String, Object> map = new HashMap<>(); - for (SimpleString propName : properties.getPropertyNames()) { - map.put(propName.toString(), properties.getProperty(propName)); - } - return map; - } - - @Override - public void decodeFromBuffer(final ActiveMQBuffer buffer) { - - this.buffer = copyMessageBuffer(buffer); - - decode(); - - //synchronize indexes - buffer.setIndex(this.buffer.readerIndex(),this.buffer.writerIndex()); - - // Setting up the BodyBuffer based on endOfBodyPosition set from decode - ResetLimitWrappedActiveMQBuffer tmpbodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, this.buffer, null); - tmpbodyBuffer.readerIndex(BODY_OFFSET); - tmpbodyBuffer.writerIndex(endOfBodyPosition); - // only set this after the writer and reader is set, - // otherwise the buffer would be reset through the listener - tmpbodyBuffer.setMessage(this); - this.bodyBuffer = tmpbodyBuffer; - - } - - private ActiveMQBuffer copyMessageBuffer(ActiveMQBuffer buffer) { - ActiveMQBuffer copiedBuffer; - - ByteBuf newNettyBuffer = Unpooled.buffer( buffer.byteBuf().capacity() ); - - int read = buffer.byteBuf().readerIndex(); - int writ = buffer.byteBuf().writerIndex(); - - int readArt = buffer.readerIndex(); - int writArt = buffer.writerIndex(); - buffer.byteBuf().readerIndex( 0 ); - - buffer.byteBuf().readBytes( newNettyBuffer, 0, buffer.byteBuf().writerIndex() ); - buffer.byteBuf().setIndex( read, writ ); - newNettyBuffer.setIndex( read, writ ); - - copiedBuffer = new ChannelBufferWrapper( newNettyBuffer ); - - buffer.setIndex( readArt, writArt ); - copiedBuffer.setIndex( readArt, writArt ); - - return copiedBuffer; - } - - @Override - public void bodyChanged() { - bufferValid = false; - - endOfBodyPosition = -1; - } - - @Override - public int getEndOfMessagePosition() { - return endOfMessagePosition; - } - - @Override - public int getEndOfBodyPosition() { - if (endOfBodyPosition < 0) { - endOfBodyPosition = getBodyBuffer().writerIndex(); - } - return endOfBodyPosition; - } - - // Encode to journal or paging - public void encode(final ActiveMQBuffer buff) { - encodeToBuffer(); - - buff.writeBytes(buffer, BUFFER_HEADER_SPACE, endOfMessagePosition - BUFFER_HEADER_SPACE); - } - - // Decode from journal or paging - public void decode(final ActiveMQBuffer buff) { - int start = buff.readerIndex(); - - endOfBodyPosition = buff.readInt(); - - endOfMessagePosition = buff.getInt(endOfBodyPosition - BUFFER_HEADER_SPACE + start); - - int length = endOfMessagePosition - BUFFER_HEADER_SPACE; - - buffer.setIndex(0, BUFFER_HEADER_SPACE); - - buffer.writeBytes(buff, start, length); - - decode(); - - buff.readerIndex(start + length); - } - - @Override - public synchronized ActiveMQBuffer getEncodedBuffer() { - ActiveMQBuffer buff = encodeToBuffer(); - return buff.duplicate(); - } - - @Override - public void setAddressTransient(final SimpleString address) { - this.address = address; - } - - // Properties - // --------------------------------------------------------------------------------------- - - @Override - public Message putBooleanProperty(final SimpleString key, final boolean value) { - properties.putBooleanProperty(key, value); - - bufferValid = false; - - return this; - } - - @Override - public Message putByteProperty(final SimpleString key, final byte value) { - properties.putByteProperty(key, value); - - bufferValid = false; - - return this; - } - - @Override - public Message putBytesProperty(final SimpleString key, final byte[] value) { - properties.putBytesProperty(key, value); - - bufferValid = false; - - return this; - } - - @Override - public Message putCharProperty(SimpleString key, char value) { - properties.putCharProperty(key, value); - bufferValid = false; - - return this; - } - - @Override - public Message putCharProperty(String key, char value) { - properties.putCharProperty(new SimpleString(key), value); - bufferValid = false; - - return this; - } - - @Override - public Message putShortProperty(final SimpleString key, final short value) { - properties.putShortProperty(key, value); - bufferValid = false; - - return this; - } - - @Override - public Message putIntProperty(final SimpleString key, final int value) { - properties.putIntProperty(key, value); - bufferValid = false; - - return this; - } - - @Override - public Message putLongProperty(final SimpleString key, final long value) { - properties.putLongProperty(key, value); - bufferValid = false; - - return this; - } - - @Override - public Message putFloatProperty(final SimpleString key, final float value) { - properties.putFloatProperty(key, value); - - bufferValid = false; - - return this; - } - - @Override - public Message putDoubleProperty(final SimpleString key, final double value) { - properties.putDoubleProperty(key, value); - - bufferValid = false; - - return this; - } - - @Override - public Message putStringProperty(final SimpleString key, final SimpleString value) { - properties.putSimpleStringProperty(key, value); - - bufferValid = false; - - return this; - } - - @Override - public Message putObjectProperty(final SimpleString key, - final Object value) throws ActiveMQPropertyConversionException { - TypedProperties.setObjectProperty(key, value, properties); - bufferValid = false; - - return this; - } - - @Override - public Message putObjectProperty(final String key, final Object value) throws ActiveMQPropertyConversionException { - putObjectProperty(new SimpleString(key), value); - - bufferValid = false; - - return this; - } - - @Override - public Message putBooleanProperty(final String key, final boolean value) { - properties.putBooleanProperty(new SimpleString(key), value); - - bufferValid = false; - - return this; - } - - @Override - public Message putByteProperty(final String key, final byte value) { - properties.putByteProperty(new SimpleString(key), value); - - bufferValid = false; - - return this; - } - - @Override - public Message putBytesProperty(final String key, final byte[] value) { - properties.putBytesProperty(new SimpleString(key), value); - - bufferValid = false; - - return this; - } - - @Override - public Message putShortProperty(final String key, final short value) { - properties.putShortProperty(new SimpleString(key), value); - - bufferValid = false; - - return this; - } - - @Override - public Message putIntProperty(final String key, final int value) { - properties.putIntProperty(new SimpleString(key), value); - - bufferValid = false; - - return this; - } - - @Override - public Message putLongProperty(final String key, final long value) { - properties.putLongProperty(new SimpleString(key), value); - - bufferValid = false; - - return this; - } - - @Override - public Message putFloatProperty(final String key, final float value) { - properties.putFloatProperty(new SimpleString(key), value); - - bufferValid = false; - - return this; - } - - @Override - public Message putDoubleProperty(final String key, final double value) { - properties.putDoubleProperty(new SimpleString(key), value); - - bufferValid = false; - - return this; - } - - @Override - public Message putStringProperty(final String key, final String value) { - properties.putSimpleStringProperty(new SimpleString(key), SimpleString.toSimpleString(value)); - - bufferValid = false; - - return this; - } - - public Message putTypedProperties(final TypedProperties otherProps) { - properties.putTypedProperties(otherProps); - - bufferValid = false; - - return this; - } - - @Override - public Object getObjectProperty(final SimpleString key) { - return properties.getProperty(key); - } - - @Override - public Boolean getBooleanProperty(final SimpleString key) throws ActiveMQPropertyConversionException { - return properties.getBooleanProperty(key); - } - - @Override - public Boolean getBooleanProperty(final String key) throws ActiveMQPropertyConversionException { - return properties.getBooleanProperty(new SimpleString(key)); - } - - @Override - public Byte getByteProperty(final SimpleString key) throws ActiveMQPropertyConversionException { - return properties.getByteProperty(key); - } - - @Override - public Byte getByteProperty(final String key) throws ActiveMQPropertyConversionException { - return properties.getByteProperty(new SimpleString(key)); - } - - @Override - public byte[] getBytesProperty(final SimpleString key) throws ActiveMQPropertyConversionException { - return properties.getBytesProperty(key); - } - - @Override - public byte[] getBytesProperty(final String key) throws ActiveMQPropertyConversionException { - return getBytesProperty(new SimpleString(key)); - } - - @Override - public Double getDoubleProperty(final SimpleString key) throws ActiveMQPropertyConversionException { - return properties.getDoubleProperty(key); - } - - @Override - public Double getDoubleProperty(final String key) throws ActiveMQPropertyConversionException { - return properties.getDoubleProperty(new SimpleString(key)); - } - - @Override - public Integer getIntProperty(final SimpleString key) throws ActiveMQPropertyConversionException { - return properties.getIntProperty(key); - } - - @Override - public Integer getIntProperty(final String key) throws ActiveMQPropertyConversionException { - return properties.getIntProperty(new SimpleString(key)); - } - - @Override - public Long getLongProperty(final SimpleString key) throws ActiveMQPropertyConversionException { - return properties.getLongProperty(key); - } - - @Override - public Long getLongProperty(final String key) throws ActiveMQPropertyConversionException { - return properties.getLongProperty(new SimpleString(key)); - } - - @Override - public Short getShortProperty(final SimpleString key) throws ActiveMQPropertyConversionException { - return properties.getShortProperty(key); - } - - @Override - public Short getShortProperty(final String key) throws ActiveMQPropertyConversionException { - return properties.getShortProperty(new SimpleString(key)); - } - - @Override - public Float getFloatProperty(final SimpleString key) throws ActiveMQPropertyConversionException { - return properties.getFloatProperty(key); - } - - @Override - public Float getFloatProperty(final String key) throws ActiveMQPropertyConversionException { - return properties.getFloatProperty(new SimpleString(key)); - } - - @Override - public String getStringProperty(final SimpleString key) throws ActiveMQPropertyConversionException { - SimpleString str = getSimpleStringProperty(key); - - if (str == null) { - return null; - } else { - return str.toString(); - } - } - - @Override - public String getStringProperty(final String key) throws ActiveMQPropertyConversionException { - return getStringProperty(new SimpleString(key)); - } - - @Override - public SimpleString getSimpleStringProperty(final SimpleString key) throws ActiveMQPropertyConversionException { - return properties.getSimpleStringProperty(key); - } - - @Override - public SimpleString getSimpleStringProperty(final String key) throws ActiveMQPropertyConversionException { - return properties.getSimpleStringProperty(new SimpleString(key)); - } - - @Override - public Object getObjectProperty(final String key) { - return properties.getProperty(new SimpleString(key)); - } - - @Override - public Object removeProperty(final SimpleString key) { - bufferValid = false; - - return properties.removeProperty(key); - } - - @Override - public Object removeProperty(final String key) { - bufferValid = false; - - return properties.removeProperty(new SimpleString(key)); - } - - @Override - public boolean containsProperty(final SimpleString key) { - return properties.containsProperty(key); - } - - @Override - public boolean containsProperty(final String key) { - return properties.containsProperty(new SimpleString(key)); - } - - @Override - public Set<SimpleString> getPropertyNames() { - return properties.getPropertyNames(); - } - - @Override - public ActiveMQBuffer getWholeBuffer() { - return buffer; - } - - @Override - public BodyEncoder getBodyEncoder() throws ActiveMQException { - return new DecodingContext(); - } - - @Override - public TypedProperties getTypedProperties() { - return this.properties; - } - - @Override - public boolean equals(Object other) { - - if (this == other) { - return true; - } - - if (other instanceof MessageImpl) { - MessageImpl message = (MessageImpl) other; - - if (this.getMessageID() == message.getMessageID()) - return true; - } - - return false; - } - - /** - * Debug Helper!!!! - * - * I'm leaving this message here without any callers for a reason: - * During debugs it's important eventually to identify what's on the bodies, and this method will give you a good idea about them. - * Add the message.bodyToString() to the Watch variables on the debugger view and this will show up like a charm!!! - * - * @return - */ - public String bodyToString() { - getEndOfBodyPosition(); - int readerIndex1 = this.buffer.readerIndex(); - buffer.readerIndex(0); - byte[] buffer1 = new byte[buffer.writerIndex()]; - buffer.readBytes(buffer1); - buffer.readerIndex(readerIndex1); - - byte[] buffer2 = null; - if (bodyBuffer != null) { - int readerIndex2 = this.bodyBuffer.readerIndex(); - bodyBuffer.readerIndex(0); - buffer2 = new byte[bodyBuffer.writerIndex() - bodyBuffer.readerIndex()]; - bodyBuffer.readBytes(buffer2); - bodyBuffer.readerIndex(readerIndex2); - return "ServerMessage@" + Integer.toHexString(System.identityHashCode(this)) + "[writerIndex=" + buffer.writerIndex() + ",capacity=" + buffer.capacity() + ",bodyStart=" + getEndOfBodyPosition() + " buffer=" + ByteUtil.bytesToHex(buffer1, 1) + ", bodyBuffer=" + ByteUtil.bytesToHex(buffer2, 1); - } else { - return "ServerMessage@" + Integer.toHexString(System.identityHashCode(this)) + "[writerIndex=" + buffer.writerIndex() + ",capacity=" + buffer.capacity() + ",bodyStart=" + getEndOfBodyPosition() + " buffer=" + ByteUtil.bytesToHex(buffer1, 1); - } - - } - - @Override - public int hashCode() { - return 31 + (int) (messageID ^ (messageID >>> 32)); - } - - // Public -------------------------------------------------------- - - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - - // Private ------------------------------------------------------- - - public TypedProperties getProperties() { - return properties; - } - - // This must be synchronized as it can be called concurrently id the message is being delivered - // concurrently to - // many queues - the first caller in this case will actually encode it - private synchronized ActiveMQBuffer encodeToBuffer() { - if (!bufferValid) { - int bodySize = getEndOfBodyPosition(); - - // write it - buffer.setInt(BUFFER_HEADER_SPACE, bodySize); - - // Position at end of body and skip past the message end position int. - // check for enough room in the buffer even though it is dynamic - if ((bodySize + 4) > buffer.capacity()) { - buffer.setIndex(0, bodySize); - buffer.writeInt(0); - } else { - buffer.setIndex(0, bodySize + DataConstants.SIZE_INT); - } - - encodeHeadersAndProperties(buffer); - - // Write end of message position - - endOfMessagePosition = buffer.writerIndex(); - - buffer.setInt(bodySize, endOfMessagePosition); - - bufferValid = true; - } - - return buffer; - } - - private void decode() { - endOfBodyPosition = buffer.getInt(BUFFER_HEADER_SPACE); - - buffer.readerIndex(endOfBodyPosition + DataConstants.SIZE_INT); - - decodeHeadersAndProperties(buffer); - - endOfMessagePosition = buffer.readerIndex(); - - bufferValid = true; - } - - public void createBody(final int initialMessageBufferSize) { - buffer = ActiveMQBuffers.dynamicBuffer(initialMessageBufferSize); - - // There's a bug in netty which means a dynamic buffer won't resize until you write a byte - buffer.writeByte((byte) 0); - - buffer.setIndex(BODY_OFFSET, BODY_OFFSET); - } - - // Inner classes ------------------------------------------------- - - private final class DecodingContext implements BodyEncoder { - - private int lastPos = 0; - - private DecodingContext() { - } - - @Override - public void open() { - } - - @Override - public void close() { - } - - @Override - public long getLargeBodySize() { - return buffer.writerIndex(); - } - - @Override - public int encode(final ByteBuffer bufferRead) throws ActiveMQException { - ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(bufferRead); - return encode(buffer, bufferRead.capacity()); - } - - @Override - public int encode(final ActiveMQBuffer bufferOut, final int size) { - bufferOut.writeBytes(getWholeBuffer(), lastPos, size); - lastPos += size; - return size; - } - } - -}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternal.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternal.java deleted file mode 100644 index a7b2199..0000000 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternal.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.message.impl; - -import java.io.InputStream; - -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.api.core.Message; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.message.BodyEncoder; -import org.apache.activemq.artemis.utils.TypedProperties; - -public interface MessageInternal extends Message { - - void decodeFromBuffer(ActiveMQBuffer buffer); - - int getEndOfMessagePosition(); - - int getEndOfBodyPosition(); - - void bodyChanged(); - - boolean isServerMessage(); - - ActiveMQBuffer getEncodedBuffer(); - - int getHeadersAndPropertiesEncodeSize(); - - ActiveMQBuffer getWholeBuffer(); - - void encodeHeadersAndProperties(ActiveMQBuffer buffer); - - void decodeHeadersAndProperties(ActiveMQBuffer buffer); - - BodyEncoder getBodyEncoder() throws ActiveMQException; - - InputStream getBodyInputStream(); - - void setAddressTransient(SimpleString address); - - TypedProperties getTypedProperties(); -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index ae1cf71..9975a5b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -32,6 +32,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientSession; @@ -45,7 +46,7 @@ import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal; import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal; import org.apache.activemq.artemis.core.client.impl.ClientProducerCreditsImpl; import org.apache.activemq.artemis.core.client.impl.ClientSessionImpl; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.protocol.core.Channel; import org.apache.activemq.artemis.core.protocol.core.ChannelHandler; import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler; @@ -103,7 +104,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAR import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; @@ -422,12 +422,12 @@ public class ActiveMQSessionContext extends SessionContext { } @Override - public int getCreditsOnSendingFull(MessageInternal msgI) { + public int getCreditsOnSendingFull(Message msgI) { return msgI.getEncodeSize(); } @Override - public void sendFullMessage(MessageInternal msgI, + public void sendFullMessage(Message msgI, boolean sendBlocking, SendAcknowledgementHandler handler, SimpleString defaultAddress) throws ActiveMQException { @@ -441,16 +441,16 @@ public class ActiveMQSessionContext extends SessionContext { } @Override - public int sendInitialChunkOnLargeMessage(MessageInternal msgI) throws ActiveMQException { + public int sendInitialChunkOnLargeMessage(Message msgI) throws ActiveMQException { SessionSendLargeMessage initialChunk = new SessionSendLargeMessage(msgI); sessionChannel.send(initialChunk); - return msgI.getHeadersAndPropertiesEncodeSize(); + return ((CoreMessage)msgI).getHeadersAndPropertiesEncodeSize(); } @Override - public int sendLargeMessageChunk(MessageInternal msgI, + public int sendLargeMessageChunk(Message msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, @@ -471,7 +471,7 @@ public class ActiveMQSessionContext extends SessionContext { } @Override - public int sendServerLargeMessageChunk(MessageInternal msgI, + public int sendServerLargeMessageChunk(Message msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index 0f5cdf0..e95227d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -371,6 +371,7 @@ public final class ChannelImpl implements Channel { if (logger.isTraceEnabled()) { logger.trace("Sending blocking " + packet); } + connection.getTransportConnection().write(buffer, false, false); long toWait = connection.getBlockingCallTimeout(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index 9025210..08c17e4 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -16,8 +16,11 @@ */ package org.apache.activemq.artemis.core.protocol.core.impl; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.utils.DataConstants; @@ -25,6 +28,7 @@ import org.apache.activemq.artemis.utils.DataConstants; public class PacketImpl implements Packet { // Constants ------------------------------------------------------------------------- + public static final int ADDRESSING_CHANGE_VERSION = 129; public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue."); @@ -310,7 +314,7 @@ public class PacketImpl implements Packet { @Override public ActiveMQBuffer encode(final RemotingConnection connection, boolean usePooled) { - ActiveMQBuffer buffer = connection.createTransportBuffer(PacketImpl.INITIAL_PACKET_SIZE, usePooled); + ActiveMQBuffer buffer = createPacket(connection, usePooled); // The standard header fields @@ -330,6 +334,14 @@ public class PacketImpl implements Packet { return buffer; } + protected ActiveMQBuffer createPacket(RemotingConnection connection, boolean usePooled) { + if (connection == null) { + return new ChannelBufferWrapper(Unpooled.buffer(INITIAL_PACKET_SIZE)); + } else { + return connection.createTransportBuffer(PacketImpl.INITIAL_PACKET_SIZE, usePooled); + } + } + @Override public void decode(final ActiveMQBuffer buffer) { channelID = buffer.readLong(); @@ -339,6 +351,22 @@ public class PacketImpl implements Packet { size = buffer.readerIndex(); } + protected ByteBuf copyMessageBuffer(ByteBuf buffer, int skipBytes) { + + ByteBuf newNettyBuffer = Unpooled.buffer(buffer.capacity() - PACKET_HEADERS_SIZE - skipBytes); + + int read = buffer.readerIndex(); + int writ = buffer.writerIndex(); + buffer.readerIndex(PACKET_HEADERS_SIZE); + + newNettyBuffer.writeBytes(buffer, buffer.readableBytes() - skipBytes); + buffer.setIndex( read, writ ); + newNettyBuffer.setIndex( 0, writ - PACKET_HEADERS_SIZE - skipBytes); + + return newNettyBuffer; + } + + @Override public int getPacketSize() { if (size == -1) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java index 8bd62ca..cada061 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java @@ -353,6 +353,7 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement } dataReceived = true; + doBufferReceived(packet); super.bufferReceived(connectionID, buffer); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java index 6a52a27..ec2520a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java @@ -16,15 +16,19 @@ */ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; + +import io.netty.buffer.Unpooled; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.Message; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; +import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; public abstract class MessagePacket extends PacketImpl implements MessagePacketI { - protected MessageInternal message; + protected Message message; - public MessagePacket(final byte type, final MessageInternal message) { + public MessagePacket(final byte type, final Message message) { super(type); this.message = message; @@ -40,4 +44,12 @@ public abstract class MessagePacket extends PacketImpl implements MessagePacketI return super.getParentString() + ", message=" + message; } + protected ActiveMQBuffer internalCreatePacket(int size, RemotingConnection connection, boolean usePooled) { + if (connection == null) { + return new ChannelBufferWrapper(Unpooled.buffer(size)); + } else { + return connection.createTransportBuffer(size, usePooled); + } + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveClientLargeMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveClientLargeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveClientLargeMessage.java index 66e509c..e9e3138 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveClientLargeMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveClientLargeMessage.java @@ -17,12 +17,13 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; + +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; public class SessionReceiveClientLargeMessage extends SessionReceiveLargeMessage { - public SessionReceiveClientLargeMessage(MessageInternal message) { + public SessionReceiveClientLargeMessage(Message message) { super(message); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java index 64f96f9..dc2c458 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java @@ -18,12 +18,12 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.Message; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; public class SessionReceiveLargeMessage extends PacketImpl implements MessagePacketI { - private final MessageInternal message; + private final Message message; /** * Since we receive the message before the entire message was received, @@ -35,13 +35,13 @@ public class SessionReceiveLargeMessage extends PacketImpl implements MessagePac private int deliveryCount; // To be used on decoding at the client while receiving a large message - public SessionReceiveLargeMessage(final MessageInternal message) { + public SessionReceiveLargeMessage(final Message message) { super(SESS_RECEIVE_LARGE_MSG); this.message = message; } public SessionReceiveLargeMessage(final long consumerID, - final MessageInternal message, + final Message message, final long largeMessageSize, final int deliveryCount) { super(SESS_RECEIVE_LARGE_MSG); @@ -55,7 +55,7 @@ public class SessionReceiveLargeMessage extends PacketImpl implements MessagePac this.largeMessageSize = largeMessageSize; } - public MessageInternal getLargeMessage() { + public Message getLargeMessage() { return message; } @@ -85,7 +85,7 @@ public class SessionReceiveLargeMessage extends PacketImpl implements MessagePac buffer.writeInt(deliveryCount); buffer.writeLong(largeMessageSize); if (message != null) { - message.encodeHeadersAndProperties(buffer); + ((CoreMessage)message).encodeHeadersAndProperties(buffer.byteBuf()); } } @@ -94,7 +94,7 @@ public class SessionReceiveLargeMessage extends PacketImpl implements MessagePac consumerID = buffer.readLong(); deliveryCount = buffer.readInt(); largeMessageSize = buffer.readLong(); - message.decodeHeadersAndProperties(buffer); + ((CoreMessage)message).decodeHeadersAndProperties(buffer.byteBuf()); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java index c21ebda..c03d3c8 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java @@ -17,7 +17,8 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; + +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.utils.DataConstants; @@ -30,7 +31,7 @@ public class SessionReceiveMessage extends MessagePacket { private int deliveryCount; - public SessionReceiveMessage(final long consumerID, final MessageInternal message, final int deliveryCount) { + public SessionReceiveMessage(final long consumerID, final Message message, final int deliveryCount) { super(SESS_RECEIVE_MSG, message); this.consumerID = consumerID; @@ -38,7 +39,7 @@ public class SessionReceiveMessage extends MessagePacket { this.deliveryCount = deliveryCount; } - public SessionReceiveMessage(final MessageInternal message) { + public SessionReceiveMessage(final Message message) { super(SESS_RECEIVE_MSG, message); } @@ -53,53 +54,28 @@ public class SessionReceiveMessage extends MessagePacket { } @Override - public ActiveMQBuffer encode(final RemotingConnection connection) { - ActiveMQBuffer buffer = message.getEncodedBuffer(); - - ActiveMQBuffer bufferWrite = connection.createTransportBuffer(buffer.writerIndex() + DataConstants.SIZE_LONG + DataConstants.SIZE_INT, true); - bufferWrite.writeBytes(buffer, 0, buffer.capacity()); - bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex()); - - // Sanity check - if (bufferWrite.writerIndex() != message.getEndOfMessagePosition()) { - throw new IllegalStateException("Wrong encode position"); - } - - bufferWrite.writeLong(consumerID); - bufferWrite.writeInt(deliveryCount); - - size = bufferWrite.writerIndex(); - - // Write standard headers - - int len = size - DataConstants.SIZE_INT; - bufferWrite.setInt(0, len); - bufferWrite.setByte(DataConstants.SIZE_INT, getType()); - bufferWrite.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID); - - // Position reader for reading by Netty - bufferWrite.setIndex(0, size); - - return bufferWrite; + protected ActiveMQBuffer createPacket(RemotingConnection connection, boolean usePooled) { + return internalCreatePacket(message.getEncodeSize() + PACKET_HEADERS_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT, connection, usePooled); } @Override - public void decode(final ActiveMQBuffer buffer) { - channelID = buffer.readLong(); - - message.decodeFromBuffer(buffer); - - consumerID = buffer.readLong(); + public void encodeRest(ActiveMQBuffer buffer) { + message.sendBuffer(buffer.byteBuf(), deliveryCount); + buffer.writeLong(consumerID); + buffer.writeInt(deliveryCount); + } - deliveryCount = buffer.readInt(); + @Override + public void decodeRest(final ActiveMQBuffer buffer) { + // Buffer comes in after having read standard headers and positioned at Beginning of body part - size = buffer.readerIndex(); + message.receiveBuffer(copyMessageBuffer(buffer.byteBuf(), DataConstants.SIZE_LONG + DataConstants.SIZE_INT)); - // Need to position buffer for reading + buffer.readerIndex(buffer.capacity() - DataConstants.SIZE_LONG - DataConstants.SIZE_INT); + this.consumerID = buffer.readLong(); + this.deliveryCount = buffer.readInt(); - buffer.setIndex(PACKET_HEADERS_SIZE + DataConstants.SIZE_INT, message.getEndOfBodyPosition()); } - @Override public int hashCode() { final int prime = 31; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java index b4ec027..0ecfe33 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java @@ -17,8 +17,8 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; /** * A SessionSendContinuationMessage<br> @@ -28,7 +28,7 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage { private boolean requiresResponse; // Used on confirmation handling - private MessageInternal message; + private Message message; /** * In case, we are using a different handler than the one set on the {@link org.apache.activemq.artemis.api.core.client.ClientSession} * <br> @@ -58,7 +58,7 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage { * @param continues * @param requiresResponse */ - public SessionSendContinuationMessage(final MessageInternal message, + public SessionSendContinuationMessage(final Message message, final byte[] body, final boolean continues, final boolean requiresResponse, @@ -87,7 +87,7 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage { /** * @return the message */ - public MessageInternal getMessage() { + public Message getMessage() { return message; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java index bf4290b..869940c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java @@ -18,7 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.Message; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; public class SessionSendLargeMessage extends PacketImpl implements MessagePacketI { @@ -26,13 +26,13 @@ public class SessionSendLargeMessage extends PacketImpl implements MessagePacket /** * Used only if largeMessage */ - private final MessageInternal largeMessage; + private final Message largeMessage; // Static -------------------------------------------------------- // Constructors -------------------------------------------------- - public SessionSendLargeMessage(final MessageInternal largeMessage) { + public SessionSendLargeMessage(final Message largeMessage) { super(SESS_SEND_LARGE); this.largeMessage = largeMessage; @@ -40,7 +40,7 @@ public class SessionSendLargeMessage extends PacketImpl implements MessagePacket // Public -------------------------------------------------------- - public MessageInternal getLargeMessage() { + public Message getLargeMessage() { return largeMessage; } @@ -51,12 +51,12 @@ public class SessionSendLargeMessage extends PacketImpl implements MessagePacket @Override public void encodeRest(final ActiveMQBuffer buffer) { - largeMessage.encodeHeadersAndProperties(buffer); + ((CoreMessage)largeMessage).encodeHeadersAndProperties(buffer.byteBuf()); } @Override public void decodeRest(final ActiveMQBuffer buffer) { - largeMessage.decodeHeadersAndProperties(buffer); + ((CoreMessage)largeMessage).decodeHeadersAndProperties(buffer.byteBuf()); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java index c7bb30e..8182b90 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java @@ -16,11 +16,12 @@ */ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; +import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; -import org.apache.activemq.artemis.utils.DataConstants; public class SessionSendMessage extends MessagePacket { @@ -36,7 +37,8 @@ public class SessionSendMessage extends MessagePacket { */ private final transient SendAcknowledgementHandler handler; - public SessionSendMessage(final MessageInternal message, + /** This will be using the CoreMessage because it is meant for the core-protocol */ + public SessionSendMessage(final Message message, final boolean requiresResponse, final SendAcknowledgementHandler handler) { super(SESS_SEND, message); @@ -44,7 +46,7 @@ public class SessionSendMessage extends MessagePacket { this.requiresResponse = requiresResponse; } - public SessionSendMessage(final MessageInternal message) { + public SessionSendMessage(final CoreMessage message) { super(SESS_SEND, message); this.handler = null; } @@ -60,53 +62,29 @@ public class SessionSendMessage extends MessagePacket { } @Override - public ActiveMQBuffer encode(final RemotingConnection connection) { - ActiveMQBuffer buffer = message.getEncodedBuffer(); - - ActiveMQBuffer bufferWrite; - if (connection == null) { - // this is for unit tests only - bufferWrite = buffer.copy(0, buffer.capacity()); - } else { - bufferWrite = connection.createTransportBuffer(buffer.writerIndex() + 1, true); // 1 for the requireResponse - } - bufferWrite.writeBytes(buffer, 0, buffer.writerIndex()); - bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex()); - - // Sanity check - if (bufferWrite.writerIndex() != message.getEndOfMessagePosition()) { - throw new IllegalStateException("Wrong encode position"); - } - - bufferWrite.writeBoolean(requiresResponse); - - size = bufferWrite.writerIndex(); - - // Write standard headers + protected ActiveMQBuffer createPacket(RemotingConnection connection, boolean usePooled) { + return internalCreatePacket(message.getEncodeSize() + PACKET_HEADERS_SIZE + 1, connection, usePooled); + } - int len = size - DataConstants.SIZE_INT; - bufferWrite.setInt(0, len); - bufferWrite.setByte(DataConstants.SIZE_INT, getType()); - bufferWrite.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID); + @Override + public void encodeRest(ActiveMQBuffer buffer) { + message.sendBuffer(buffer.byteBuf(), 0); + buffer.writeBoolean(requiresResponse); - // Position reader for reading by Netty - bufferWrite.readerIndex(0); - return bufferWrite; } @Override public void decodeRest(final ActiveMQBuffer buffer) { // Buffer comes in after having read standard headers and positioned at Beginning of body part - message.decodeFromBuffer(buffer); + ByteBuf messageBuffer = copyMessageBuffer(buffer.byteBuf(), 1); + message.receiveBuffer(messageBuffer); - int ri = buffer.readerIndex(); + buffer.readerIndex(buffer.capacity() - 1); requiresResponse = buffer.readBoolean(); - buffer.readerIndex(ri); - } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java index 65aeccb..8560f5d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java @@ -26,7 +26,7 @@ public class MapMessageUtil extends MessageUtil { */ public static void writeBodyMap(ActiveMQBuffer message, TypedProperties properties) { message.resetWriterIndex(); - properties.encode(message); + properties.encode(message.byteBuf()); } /** @@ -43,7 +43,7 @@ public class MapMessageUtil extends MessageUtil { */ public static void readBodyMap(ActiveMQBuffer message, TypedProperties map) { message.resetReaderIndex(); - map.decode(message); + map.decode(message.byteBuf()); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java index 72795b7..8bb0081 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java @@ -33,7 +33,6 @@ import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal; import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal; import org.apache.activemq.artemis.core.client.impl.ClientProducerCreditsImpl; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.utils.IDGenerator; @@ -128,9 +127,9 @@ public abstract class SessionContext { } - public abstract int getCreditsOnSendingFull(MessageInternal msgI); + public abstract int getCreditsOnSendingFull(Message msgI); - public abstract void sendFullMessage(MessageInternal msgI, + public abstract void sendFullMessage(Message msgI, boolean sendBlocking, SendAcknowledgementHandler handler, SimpleString defaultAddress) throws ActiveMQException; @@ -142,9 +141,9 @@ public abstract class SessionContext { * @return * @throws ActiveMQException */ - public abstract int sendInitialChunkOnLargeMessage(MessageInternal msgI) throws ActiveMQException; + public abstract int sendInitialChunkOnLargeMessage(Message msgI) throws ActiveMQException; - public abstract int sendLargeMessageChunk(MessageInternal msgI, + public abstract int sendLargeMessageChunk(Message msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, @@ -152,7 +151,7 @@ public abstract class SessionContext { int reconnectID, SendAcknowledgementHandler messageHandler) throws ActiveMQException; - public abstract int sendServerLargeMessageChunk(MessageInternal msgI, + public abstract int sendServerLargeMessageChunk(Message msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java new file mode 100644 index 0000000..5e92eaf --- /dev/null +++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java @@ -0,0 +1,365 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.message; + +import java.util.LinkedList; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage; +import org.apache.activemq.artemis.reader.TextMessageUtil; +import org.apache.activemq.artemis.utils.Base64; +import org.apache.activemq.artemis.utils.ByteUtil; +import org.apache.activemq.artemis.utils.UUID; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +public class CoreMessageTest { + + public static final SimpleString ADDRESS = new SimpleString("this.local.address"); + public static final byte MESSAGE_TYPE = Message.TEXT_TYPE; + public static final boolean DURABLE = true; + public static final long EXPIRATION = 123L; + public static final long TIMESTAMP = 321L; + public static final byte PRIORITY = (byte) 3; + public static final String TEXT = "hi"; + public static final String BIGGER_TEXT = "AAAAAAAAAAAAAAAAAAAAAAAAA ASDF ASDF ASF ASD ASF ASDF ASDF ASDF ASF ADSF ASDF"; + public static final String SMALLER_TEXT = "H"; + public static final UUID uuid = new UUID(UUID.TYPE_TIME_BASED, new byte[]{0, 0, 0, 0, + 0, 0, 0, 0, + 0, 0, 0, 0, + 0, 0, 0, 1}); + public static final SimpleString PROP1_NAME = new SimpleString("t1"); + public static final SimpleString PROP1_VALUE = new SimpleString("value-t1"); + + /** + * This encode was generated by {@link #generate()}. + * Run it manually with a right-click on the IDE to eventually update it + * */ + // body = "hi"; + private final String STRING_ENCODE = "AAAAFgEAAAAEaABpAAAAAAAAAAAAAQAAACR0AGgAaQBzAC4AbABvAGMAYQBsAC4AYQBkAGQAcgBlAHMAcwAAAwEAAAAAAAAAewAAAAAAAAFBAwEAAAABAAAABHQAMQAKAAAAEHYAYQBsAHUAZQAtAHQAMQA="; + + private ByteBuf BYTE_ENCODE; + + + @Before + public void before() { + BYTE_ENCODE = Unpooled.wrappedBuffer(Base64.decode(STRING_ENCODE, Base64.DONT_BREAK_LINES | Base64.URL_SAFE)); + // some extra caution here, nothing else, to make sure we would get the same encoding back + Assert.assertEquals(STRING_ENCODE, encodeString(BYTE_ENCODE.array())); + BYTE_ENCODE.readerIndex(0).writerIndex(BYTE_ENCODE.capacity()); + } + + /** The message is received, then sent to the other side untouched */ + @Test + public void testPassThrough() { + CoreMessage decodedMessage = decodeMessage(); + + Assert.assertEquals(TEXT, TextMessageUtil.readBodyText(decodedMessage.getReadOnlyBodyBuffer()).toString()); + } + + /** The message is received, then sent to the other side untouched */ + @Test + public void sendThroughPackets() { + CoreMessage decodedMessage = decodeMessage(); + + int encodeSize = decodedMessage.getEncodeSize(); + Assert.assertEquals(BYTE_ENCODE.capacity(), encodeSize); + + SessionSendMessage sendMessage = new SessionSendMessage(decodedMessage, true, null); + sendMessage.setChannelID(777); + + ActiveMQBuffer buffer = sendMessage.encode(null); + + byte[] byteArray = buffer.byteBuf().array(); + System.out.println("Sending " + ByteUtil.bytesToHex(buffer.toByteBuffer().array(), 1) + ", bytes = " + byteArray.length); + + buffer.readerIndex(5); + + SessionSendMessage sendMessageReceivedSent = new SessionSendMessage(new CoreMessage()); + + sendMessageReceivedSent.decode(buffer); + + Assert.assertEquals(encodeSize, sendMessageReceivedSent.getMessage().getEncodeSize()); + + Assert.assertTrue(sendMessageReceivedSent.isRequiresResponse()); + + Assert.assertEquals(TEXT, TextMessageUtil.readBodyText(sendMessageReceivedSent.getMessage().getReadOnlyBodyBuffer()).toString()); + } + + /** The message is received, then sent to the other side untouched */ + @Test + public void sendThroughPacketsClient() { + CoreMessage decodedMessage = decodeMessage(); + + int encodeSize = decodedMessage.getEncodeSize(); + Assert.assertEquals(BYTE_ENCODE.capacity(), encodeSize); + + SessionReceiveMessage sendMessage = new SessionReceiveMessage(33, decodedMessage, 7); + sendMessage.setChannelID(777); + + ActiveMQBuffer buffer = sendMessage.encode(null); + + buffer.readerIndex(5); + + SessionReceiveMessage sendMessageReceivedSent = new SessionReceiveMessage(new CoreMessage()); + + sendMessageReceivedSent.decode(buffer); + + Assert.assertEquals(33, sendMessageReceivedSent.getConsumerID()); + + Assert.assertEquals(7, sendMessageReceivedSent.getDeliveryCount()); + + Assert.assertEquals(encodeSize, sendMessageReceivedSent.getMessage().getEncodeSize()); + + Assert.assertEquals(TEXT, TextMessageUtil.readBodyText(sendMessageReceivedSent.getMessage().getReadOnlyBodyBuffer()).toString()); + } + + private CoreMessage decodeMessage() { + + ByteBuf newBuffer = Unpooled.buffer(BYTE_ENCODE.capacity()); + newBuffer.writeBytes(BYTE_ENCODE, 0, BYTE_ENCODE.writerIndex()); + + CoreMessage coreMessage = internalDecode(newBuffer); + + int encodeSize = coreMessage.getEncodeSize(); + + Assert.assertEquals(newBuffer.capacity(), encodeSize); + + Assert.assertEquals(ADDRESS, coreMessage.getAddressSimpleString()); + + Assert.assertEquals(PROP1_VALUE.toString(), coreMessage.getStringProperty(PROP1_NAME)); + + ByteBuf destinedBuffer = Unpooled.buffer(BYTE_ENCODE.array().length); + coreMessage.sendBuffer(destinedBuffer, 0); + + byte[] destinedArray = destinedBuffer.array(); + byte[] sourceArray = BYTE_ENCODE.array(); + + CoreMessage newDecoded = internalDecode(Unpooled.wrappedBuffer(destinedArray)); + + Assert.assertEquals(encodeSize, newDecoded.getEncodeSize()); + + Assert.assertArrayEquals(sourceArray, destinedArray); + + return coreMessage; + } + + private CoreMessage internalDecode(ByteBuf bufferOrigin) { + CoreMessage coreMessage = new CoreMessage(); +// System.out.println("Bytes from test " + ByteUtil.bytesToHex(bufferOrigin.array(), 1)); + coreMessage.receiveBuffer(bufferOrigin); + return coreMessage; + } + + /** The message is received, then sent to the other side untouched */ + @Test + public void testChangeBodyStringSameSize() { + testChangeBodyString(TEXT.toUpperCase()); + } + + @Test + public void testChangeBodyBiggerString() { + testChangeBodyString(BIGGER_TEXT); + } + + @Test + public void testGenerateEmpty() { + CoreMessage empty = new CoreMessage().initBuffer(100); + ByteBuf buffer = Unpooled.buffer(200); + empty.sendBuffer(buffer, 0); + + CoreMessage empty2 = new CoreMessage(); + empty2.receiveBuffer(buffer); + + try { + empty2.getBodyBuffer().readByte(); + Assert.fail("should throw exception"); + } catch (Exception expected) { + + } + } + + @Test + public void testSaveReceiveLimitedBytes() { + CoreMessage empty = new CoreMessage().initBuffer(100); + System.out.println("R " + empty.getBodyBuffer().readerIndex() + " W " + empty.getBodyBuffer().writerIndex()); + empty.getBodyBuffer().writeByte((byte)7); + System.out.println("R " + empty.getBodyBuffer().readerIndex() + " W " + empty.getBodyBuffer().writerIndex()); + + ByteBuf buffer = Unpooled.buffer(200); + empty.sendBuffer(buffer, 0); + + CoreMessage empty2 = new CoreMessage(); + empty2.receiveBuffer(buffer); + + Assert.assertEquals((byte)7, empty2.getBodyBuffer().readByte()); + + System.out.println("Readable :: " + empty2.getBodyBuffer().readerIndex() + " writer :" + empty2.getBodyBuffer().writerIndex()); + + try { + empty2.getBodyBuffer().readByte(); + Assert.fail("should throw exception"); + } catch (Exception expected) { + + } + } + + @Test + public void testChangeBodySmallerString() { + testChangeBodyString(SMALLER_TEXT); + } + + public void testChangeBodyString(String newString) { + CoreMessage coreMessage = decodeMessage(); + + coreMessage.putStringProperty("newProperty", "newValue"); + ActiveMQBuffer legacyBuffer = coreMessage.getBodyBuffer(); + legacyBuffer.resetWriterIndex(); + legacyBuffer.clear(); + + TextMessageUtil.writeBodyText(legacyBuffer, SimpleString.toSimpleString(newString)); + + ByteBuf newbuffer = Unpooled.buffer(150000); + + coreMessage.sendBuffer(newbuffer, 0); + newbuffer.readerIndex(0); + + CoreMessage newCoreMessage = new CoreMessage(); + newCoreMessage.receiveBuffer(newbuffer); + + + SimpleString newText = TextMessageUtil.readBodyText(newCoreMessage.getReadOnlyBodyBuffer()); + + Assert.assertEquals(newString, newText.toString()); + +// coreMessage.putStringProperty() + } + + @Test + public void testPassThroughMultipleThreads() throws Throwable { + CoreMessage coreMessage = new CoreMessage(); + coreMessage.receiveBuffer(BYTE_ENCODE); + + LinkedList<Throwable> errors = new LinkedList<>(); + + Thread[] threads = new Thread[50]; + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(() -> { + try { + for (int j = 0; j < 50; j++) { + Assert.assertEquals(ADDRESS, coreMessage.getAddressSimpleString()); + Assert.assertEquals(PROP1_VALUE.toString(), coreMessage.getStringProperty(PROP1_NAME)); + + ByteBuf destinedBuffer = Unpooled.buffer(BYTE_ENCODE.array().length); + coreMessage.sendBuffer(destinedBuffer, 0); + + byte[] destinedArray = destinedBuffer.array(); + byte[] sourceArray = BYTE_ENCODE.array(); + + Assert.assertArrayEquals(sourceArray, destinedArray); + + Assert.assertEquals(TEXT, TextMessageUtil.readBodyText(coreMessage.getReadOnlyBodyBuffer()).toString()); + } + } catch (Throwable e) { + e.printStackTrace(); + errors.add(e); + } + }); + } + + for (Thread t : threads) { + t.start(); + } + + for (Thread t : threads) { + t.join(); + } + + for (Throwable e: errors) { + throw e; + } + + } + + // This is to compare the original encoding with the current version + @Test + public void compareOriginal() throws Exception { + String generated = generate(TEXT); + + Assert.assertEquals(STRING_ENCODE, generated); + + for (int i = 0; i < generated.length(); i++) { + Assert.assertEquals("Chart at " + i + " was " + generated.charAt(i) + " instead of " + STRING_ENCODE.charAt(i), generated.charAt(i), STRING_ENCODE.charAt(i)); + } + } + + /** Use this method to update the encode for the known message */ + @Ignore + @Test + public void generate() throws Exception { + + printVariable(TEXT, generate(TEXT)); + printVariable(SMALLER_TEXT, generate(SMALLER_TEXT)); + printVariable(BIGGER_TEXT, generate(BIGGER_TEXT)); + + } + + private void printVariable(String body, String encode) { + System.out.println("// body = \"" + body + "\";"); + System.out.println("private final String STRING_ENCODE = \"" + encode + "\";"); + + } + + public String generate(String body) throws Exception { + + ClientMessageImpl message = new ClientMessageImpl(MESSAGE_TYPE, DURABLE, EXPIRATION, TIMESTAMP, PRIORITY, 10 * 1024); + TextMessageUtil.writeBodyText(message.getBodyBuffer(), SimpleString.toSimpleString(body)); + + message.setAddress(ADDRESS); + message.setUserID(uuid); + message.getProperties().putSimpleStringProperty(PROP1_NAME, PROP1_VALUE); + + + ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(10 * 1024); + message.sendBuffer(buffer.byteBuf(), 0); + + byte[] bytes = new byte[buffer.byteBuf().writerIndex()]; + buffer.byteBuf().readBytes(bytes); + + return encodeString(bytes); + + // replace the code + + + } + + private String encodeString(byte[] bytes) { + return Base64.encodeBytes(bytes, 0, bytes.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE); + } + +}
