http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/MessageBody.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/MessageBody.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/MessageBody.java new file mode 100644 index 0000000..f7821b9 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/MessageBody.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.api.core.encode; + +import java.nio.ByteBuffer; + +public interface MessageBody { + Object getBody(); + + ByteBuffer getBodyArray(); + + BodyType getType(); +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java index 900305f..b5d5474 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java @@ -20,18 +20,18 @@ import java.nio.ByteBuffer; import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; + +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; /** * A ResetLimitWrappedActiveMQBuffer - * TODO: Move this to commons */ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper { private final int limit; - private MessageInternal message; + private Message message; /** * We need to turn of notifications of body changes on reset on the server side when dealing with AMQP conversions, @@ -39,17 +39,17 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper * * @param message */ - public void setMessage(MessageInternal message) { + public void setMessage(Message message) { this.message = message; } - public ResetLimitWrappedActiveMQBuffer(final int limit, final ActiveMQBuffer buffer, final MessageInternal message) { + public ResetLimitWrappedActiveMQBuffer(final int limit, final ActiveMQBuffer buffer, final Message message) { // a wrapped inside a wrapper will increase the stack size. // we fixed this here due to some profiling testing this(limit, unwrap(buffer.byteBuf()).duplicate(), message); } - public ResetLimitWrappedActiveMQBuffer(final int limit, final ByteBuf buffer, final MessageInternal message) { + public ResetLimitWrappedActiveMQBuffer(final int limit, final ByteBuf buffer, final Message message) { // a wrapped inside a wrapper will increase the stack size. // we fixed this here due to some profiling testing super(buffer); @@ -67,7 +67,7 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper private void changed() { if (message != null) { - message.bodyChanged(); + message.messageChanged(); } } @@ -94,8 +94,6 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper @Override public void resetReaderIndex() { - changed(); - buffer.readerIndex(limit); } @@ -256,6 +254,14 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper super.writeBytes(src); } + + @Override + public void writeBytes(final ByteBuf src, final int srcIndex, final int length) { + changed(); + + super.writeBytes(src, srcIndex, length); + } + @Override public void writeBytes(final ActiveMQBuffer src, final int srcIndex, final int length) { changed(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java index c3cbceb..cbfaf6f 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java @@ -59,7 +59,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C @Override public int getEncodeSize() { - if (bodyBuffer != null) { + if (writableBuffer != null) { return super.getEncodeSize(); } else { return DataConstants.SIZE_INT + DataConstants.SIZE_INT + getHeadersAndPropertiesEncodeSize(); @@ -93,7 +93,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C throw new RuntimeException(e.getMessage(), e); } - return bodyBuffer; + return writableBuffer; } @Override @@ -108,7 +108,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C @Override public void saveToOutputStream(final OutputStream out) throws ActiveMQException { - if (bodyBuffer != null) { + if (writableBuffer != null) { // The body was rebuilt on the client, so we need to behave as a regular message on this case super.saveToOutputStream(out); } else { @@ -118,7 +118,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C @Override public ClientLargeMessageImpl setOutputStream(final OutputStream out) throws ActiveMQException { - if (bodyBuffer != null) { + if (writableBuffer != null) { super.setOutputStream(out); } else { largeMessageController.setOutputStream(out); @@ -129,7 +129,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C @Override public boolean waitOutputStreamCompletion(final long timeMilliseconds) throws ActiveMQException { - if (bodyBuffer != null) { + if (writableBuffer != null) { return super.waitOutputStreamCompletion(timeMilliseconds); } else { return largeMessageController.waitCompletion(timeMilliseconds); @@ -138,7 +138,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C @Override public void discardBody() { - if (bodyBuffer != null) { + if (writableBuffer != null) { super.discardBody(); } else { largeMessageController.discardUnusedPackets(); @@ -146,17 +146,17 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C } private void checkBuffer() throws ActiveMQException { - if (bodyBuffer == null) { + if (writableBuffer == null) { long bodySize = this.largeMessageSize + BODY_OFFSET; if (bodySize > Integer.MAX_VALUE) { bodySize = Integer.MAX_VALUE; } - createBody((int) bodySize); + initBuffer((int) bodySize); - bodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer, this); + writableBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer.duplicate(), this); - largeMessageController.saveBuffer(new ActiveMQOutputStream(bodyBuffer)); + largeMessageController.saveBuffer(new ActiveMQOutputStream(writableBuffer)); } } @@ -178,7 +178,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C public void retrieveExistingData(ClientMessageInternal clMessage) { this.messageID = clMessage.getMessageID(); - this.address = clMessage.getAddress(); + this.address = clMessage.getAddressSimpleString(); this.setUserID(clMessage.getUserID()); this.setFlowControlSize(clMessage.getFlowControlSize()); this.setDeliveryCount(clMessage.getDeliveryCount()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java index 7bf8eb7..9472b01 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java @@ -28,14 +28,16 @@ import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; -import org.apache.activemq.artemis.core.message.BodyEncoder; -import org.apache.activemq.artemis.core.message.impl.MessageImpl; +import org.apache.activemq.artemis.core.message.LargeBodyEncoder; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.reader.MessageUtil; +import org.apache.activemq.artemis.utils.TypedProperties; +import org.apache.activemq.artemis.utils.UUID; /** * A ClientMessageImpl */ -public class ClientMessageImpl extends MessageImpl implements ClientMessageInternal { +public class ClientMessageImpl extends CoreMessage implements ClientMessageInternal { // added this constant here so that the client package have no dependency on JMS public static final SimpleString REPLYTO_HEADER_NAME = MessageUtil.REPLYTO_HEADER_NAME; @@ -57,6 +59,35 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter public ClientMessageImpl() { } + protected ClientMessageImpl(ClientMessageImpl other) { + super(other); + } + + @Override + public ClientMessageImpl setDurable(boolean durable) { + super.setDurable(durable); + return this; + } + + @Override + public ClientMessageImpl setExpiration(long expiration) { + super.setExpiration(expiration); + return this; + } + + @Override + public ClientMessageImpl setPriority(byte priority) { + super.setPriority(priority); + return this; + } + + @Override + public ClientMessageImpl setUserID(UUID userID) { + + return this; + } + + /* * Construct messages before sending */ @@ -66,12 +97,18 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter final long timestamp, final byte priority, final int initialMessageBufferSize) { - super(type, durable, expiration, timestamp, priority, initialMessageBufferSize); + this.setType(type).setExpiration(expiration).setTimestamp(timestamp).setDurable(durable). + setPriority(priority).initBuffer(initialMessageBufferSize); } @Override - public boolean isServerMessage() { - return false; + public void setAddressTransient(SimpleString address) { + this.address = address; + } + + @Override + public TypedProperties getProperties() { + return this.checkProperties(); } @Override @@ -108,6 +145,11 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter return this; } + + @Override + public void checkCompletion() throws ActiveMQException { + } + @Override public int getFlowControlSize() { if (flowControlSize < 0) { @@ -141,7 +183,7 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter @Override public String toString() { - return getClass().getSimpleName() + "[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",userID=" + (getUserID() != null ? getUserID() : "null") + ",properties=" + properties.toString() + "]"; + return getClass().getSimpleName() + "[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",userID=" + (getUserID() != null ? getUserID() : "null") + ",properties=" + getProperties().toString() + "]"; } @Override @@ -189,7 +231,7 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter } @Override - public BodyEncoder getBodyEncoder() throws ActiveMQException { + public LargeBodyEncoder getBodyEncoder() throws ActiveMQException { return new DecodingContext(); } @@ -307,15 +349,17 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter @Override public ClientMessageImpl writeBodyBufferBytes(byte[] bytes) { - return (ClientMessageImpl) super.writeBodyBufferBytes(bytes); + getBodyBuffer().writeBytes(bytes); + return this; } @Override public ClientMessageImpl writeBodyBufferString(String string) { - return (ClientMessageImpl) super.writeBodyBufferString(string); + getBodyBuffer().writeString(string); + return this; } - private final class DecodingContext implements BodyEncoder { + private final class DecodingContext implements LargeBodyEncoder { private DecodingContext() { } @@ -347,9 +391,15 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter @Override public int encode(final ActiveMQBuffer bufferOut, final int size) { byte[] bytes = new byte[size]; - getWholeBuffer().readBytes(bytes); + buffer.readBytes(bytes); bufferOut.writeBytes(bytes, 0, size); return size; } } + + @Override + public Message copy() { + return new ClientMessageImpl(this); + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java index 07d4719..878f799 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.core.client.impl; +import java.io.InputStream; + import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.utils.TypedProperties; @@ -44,4 +46,7 @@ public interface ClientMessageInternal extends ClientMessage { void discardBody(); boolean isCompressed(); + + InputStream getBodyInputStream(); + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java index 1dfbe72..ce16011 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java @@ -27,8 +27,8 @@ import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; -import org.apache.activemq.artemis.core.message.BodyEncoder; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; +import org.apache.activemq.artemis.core.message.LargeBodyEncoder; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.spi.core.remoting.SessionContext; import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream; import org.apache.activemq.artemis.utils.DeflaterReader; @@ -217,7 +217,7 @@ public class ClientProducerImpl implements ClientProducerInternal { session.startCall(); try { - MessageInternal msgI = (MessageInternal) msg; + Message msgI = msg; ClientProducerCredits theCredits; @@ -225,8 +225,8 @@ public class ClientProducerImpl implements ClientProducerInternal { // a note about the second check on the writerIndexSize, // If it's a server's message, it means this is being done through the bridge or some special consumer on the // server's on which case we can't' convert the message into large at the servers - if (sessionContext.supportsLargeMessage() && (msgI.getBodyInputStream() != null || msgI.isLargeMessage() || - msgI.getBodyBuffer().writerIndex() > minLargeMessageSize && !msgI.isServerMessage())) { + if (sessionContext.supportsLargeMessage() && (getBodyInputStream(msgI) != null || msgI.isLargeMessage() || + msgI.getBodyBuffer().writerIndex() > minLargeMessageSize)) { isLarge = true; } else { isLarge = false; @@ -258,7 +258,7 @@ public class ClientProducerImpl implements ClientProducerInternal { session.workDone(); if (isLarge) { - largeMessageSend(sendBlocking, msgI, theCredits, handler); + largeMessageSend(sendBlocking, (CoreMessage)msgI, theCredits, handler); } else { sendRegularMessage(sendingAddress, msgI, sendBlocking, theCredits, handler); } @@ -267,8 +267,12 @@ public class ClientProducerImpl implements ClientProducerInternal { } } + private InputStream getBodyInputStream(Message msgI) { + return ((ClientMessageInternal)msgI).getBodyInputStream(); + } + private void sendRegularMessage(final SimpleString sendingAddress, - final MessageInternal msgI, + final Message msgI, final boolean sendBlocking, final ClientProducerCredits theCredits, final SendAcknowledgementHandler handler) throws ActiveMQException { @@ -301,7 +305,7 @@ public class ClientProducerImpl implements ClientProducerInternal { * @throws ActiveMQException */ private void largeMessageSend(final boolean sendBlocking, - final MessageInternal msgI, + final CoreMessage msgI, final ClientProducerCredits credits, SendAcknowledgementHandler handler) throws ActiveMQException { logger.tracef("largeMessageSend::%s, Blocking=%s", msgI, sendBlocking); @@ -313,22 +317,22 @@ public class ClientProducerImpl implements ClientProducerInternal { } // msg.getBody() could be Null on LargeServerMessage - if (msgI.getBodyInputStream() == null && msgI.getWholeBuffer() != null) { - msgI.getWholeBuffer().readerIndex(0); + if (getBodyInputStream(msgI) == null && msgI.getBuffer() != null) { + msgI.getBuffer().readerIndex(0); } InputStream input; if (msgI.isServerMessage()) { largeMessageSendServer(sendBlocking, msgI, credits, handler); - } else if ((input = msgI.getBodyInputStream()) != null) { + } else if ((input = getBodyInputStream(msgI)) != null) { largeMessageSendStreamed(sendBlocking, msgI, input, credits, handler); } else { largeMessageSendBuffered(sendBlocking, msgI, credits, handler); } } - private void sendInitialLargeMessageHeader(MessageInternal msgI, + private void sendInitialLargeMessageHeader(Message msgI, ClientProducerCredits credits) throws ActiveMQException { int creditsUsed = sessionContext.sendInitialChunkOnLargeMessage(msgI); @@ -348,17 +352,14 @@ public class ClientProducerImpl implements ClientProducerInternal { * @throws ActiveMQException */ private void largeMessageSendServer(final boolean sendBlocking, - final MessageInternal msgI, + final Message msgI, final ClientProducerCredits credits, SendAcknowledgementHandler handler) throws ActiveMQException { sendInitialLargeMessageHeader(msgI, credits); - BodyEncoder context = msgI.getBodyEncoder(); + LargeBodyEncoder context = msgI.getBodyEncoder(); final long bodySize = context.getLargeBodySize(); - - final int reconnectID = sessionContext.getReconnectID(); - context.open(); try { @@ -392,7 +393,7 @@ public class ClientProducerImpl implements ClientProducerInternal { * @throws ActiveMQException */ private void largeMessageSendBuffered(final boolean sendBlocking, - final MessageInternal msgI, + final Message msgI, final ClientProducerCredits credits, SendAcknowledgementHandler handler) throws ActiveMQException { msgI.getBodyBuffer().readerIndex(0); @@ -407,7 +408,7 @@ public class ClientProducerImpl implements ClientProducerInternal { * @throws ActiveMQException */ private void largeMessageSendStreamed(final boolean sendBlocking, - final MessageInternal msgI, + final Message msgI, final InputStream inputStreamParameter, final ClientProducerCredits credits, SendAcknowledgementHandler handler) throws ActiveMQException { @@ -478,7 +479,7 @@ public class ClientProducerImpl implements ClientProducerInternal { msgI.putLongProperty(Message.HDR_LARGE_BODY_SIZE, deflaterReader.getTotalSize()); msgI.getBodyBuffer().writeBytes(buff, 0, pos); - sendRegularMessage(msgI.getAddress(), msgI, sendBlocking, credits, handler); + sendRegularMessage(msgI.getAddressSimpleString(), msgI, sendBlocking, credits, handler); return; } else { if (!headerSent) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java index 55f9129..ce652d2 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java @@ -513,6 +513,12 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll } @Override + public void writeBytes(ByteBuf src, int srcIndex, int length) { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + + @Override public ByteBuffer toByteBuffer() { throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java index 951aea2..0bb5690 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java @@ -863,6 +863,21 @@ public class LargeMessageControllerImpl implements LargeMessageController { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } + /** + * Transfers the specified source buffer's data to this buffer starting at + * the current {@code writerIndex} until the source buffer's position + * reaches its limit, and increases the {@code writerIndex} by the + * number of the transferred bytes. + * + * @param src The source buffer + * @throws IndexOutOfBoundsException if {@code src.remaining()} is greater than + * {@code this.writableBytes} + */ + @Override + public void writeBytes(ByteBuf src, int srcIndex, int length) { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + public int writeBytes(final InputStream in, final int length) throws IOException { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java deleted file mode 100644 index baafaac..0000000 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.message; - -import java.nio.ByteBuffer; - -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQException; - -/** - * Class used to encode message body into buffers. - * <br> - * Used to send large streams over the wire - */ -public interface BodyEncoder { - - /** - * This method must not be called directly by ActiveMQ Artemis clients. - */ - void open() throws ActiveMQException; - - /** - * This method must not be called directly by ActiveMQ Artemis clients. - */ - void close() throws ActiveMQException; - - /** - * This method must not be called directly by ActiveMQ Artemis clients. - */ - int encode(ByteBuffer bufferRead) throws ActiveMQException; - - /** - * This method must not be called directly by ActiveMQ Artemis clients. - */ - int encode(ActiveMQBuffer bufferOut, int size) throws ActiveMQException; - - /** - * This method must not be called directly by ActiveMQ Artemis clients. - */ - long getLargeBodySize(); -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java new file mode 100644 index 0000000..8b96282 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.message; + +import java.nio.ByteBuffer; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQException; + +/** + * Class used to encode message body into buffers. + * <br> + * Used to send large streams over the wire + */ +public interface LargeBodyEncoder { + + /** + * This method must not be called directly by ActiveMQ Artemis clients. + */ + void open() throws ActiveMQException; + + /** + * This method must not be called directly by ActiveMQ Artemis clients. + */ + void close() throws ActiveMQException; + + /** + * This method must not be called directly by ActiveMQ Artemis clients. + */ + int encode(ByteBuffer bufferRead) throws ActiveMQException; + + /** + * This method must not be called directly by ActiveMQ Artemis clients. + */ + int encode(ActiveMQBuffer bufferOut, int size) throws ActiveMQException; + + /** + * This method must not be called directly by ActiveMQ Artemis clients. + */ + long getLargeBodySize(); +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java new file mode 100644 index 0000000..fd09751 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -0,0 +1,1066 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.message.impl; + +import java.nio.ByteBuffer; +import java.util.Set; + +import io.netty.buffer.ByteBuf; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RefCountMessage; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.encode.BodyType; +import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; +import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer; +import org.apache.activemq.artemis.core.message.LargeBodyEncoder; +import org.apache.activemq.artemis.core.persistence.Persister; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.utils.DataConstants; +import org.apache.activemq.artemis.utils.TypedProperties; +import org.apache.activemq.artemis.utils.UUID; +import org.jboss.logging.Logger; + +/** Note: you shouldn't change properties using multi-threads. Change your properties before you can send it to multiple + * consumers */ +public class CoreMessage extends RefCountMessage { + + public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE; + + // This is an estimate of how much memory a ServerMessageImpl takes up, exclusing body and properties + // Note, it is only an estimate, it's not possible to be entirely sure with Java + // This figure is calculated using the test utilities in org.apache.activemq.tests.unit.util.sizeof + // The value is somewhat higher on 64 bit architectures, probably due to different alignment + private static final int memoryOffset = 352; + + private volatile int memoryEstimate = -1; + + + private static final Logger logger = Logger.getLogger(CoreMessage.class); + + // There's an integer with the number of bytes for the body + public static final int BODY_OFFSET = DataConstants.SIZE_INT; + + /** That is the encode for the whole message, including properties.. + it does not include the buffer for the Packet send and receive header on core protocol */ + protected ByteBuf buffer; + + private volatile boolean validBuffer = false; + + protected volatile ResetLimitWrappedActiveMQBuffer writableBuffer; + + Object body; + + protected int endOfBodyPosition = -1; + + protected int messageIDPosition = -1; + + protected long messageID; + + protected SimpleString address; + + protected byte type; + + protected boolean durable; + + /** + * GMT milliseconds at which this message expires. 0 means never expires * + */ + private long expiration; + + protected long timestamp; + + protected byte priority; + + private UUID userID; + + private int propertiesLocation = -1; + + protected volatile TypedProperties properties; + + private Object protocol; + + public CoreMessage() { + } + + @Override + public CoreMessage setProtocol(Object protocol) { + this.protocol = protocol; + return this; + } + + @Override + public Object getProtocol() { + return protocol; + } + + @Override + public Persister<Message> getPersister() { + return CoreMessagePersister.getInstance(); + } + + public CoreMessage initBuffer(final int initialMessageBufferSize) { + buffer = ActiveMQBuffers.dynamicBuffer(initialMessageBufferSize).byteBuf(); + + // There's a bug in netty which means a dynamic buffer won't resize until you write a byte + buffer.writeByte((byte) 0); + + buffer.setIndex(BODY_OFFSET, BODY_OFFSET); + + return this; + } + + @Override + public void receiveBuffer(ByteBuf buffer) { + this.buffer = buffer; + this.buffer.retain(); + decode(); + this.validBuffer = true; + } + + @Override + public ActiveMQBuffer getReadOnlyBodyBuffer() { + internalWritableBuffer(); + return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly()); + } + + /** + * + * @param sendBuffer + * @param deliveryCount Some protocols (AMQP) will have this as part of the message. ignored on core + */ + @Override + public void sendBuffer(ByteBuf sendBuffer, int deliveryCount) { + checkEncode(); + sendBuffer.writeBytes(buffer, 0, buffer.writerIndex()); + } + + private synchronized void checkEncode() { + if (!validBuffer) { + encode(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public ActiveMQBuffer getBodyBuffer() { + // if using the writable buffer, we must parse properties + checkProperties(); + + internalWritableBuffer(); + + return writableBuffer; + } + + private void internalWritableBuffer() { + if (writableBuffer == null) { + writableBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer.duplicate(), this); + if (endOfBodyPosition > 0) { + writableBuffer.byteBuf().setIndex(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE + BODY_OFFSET); + writableBuffer.resetReaderIndex(); + } + } + } + + public int getEndOfBodyPosition() { + if (endOfBodyPosition < 0) { + endOfBodyPosition = getBodyBuffer().writerIndex(); + } + return endOfBodyPosition; + } + + + public TypedProperties getTypedProperties() { + return checkProperties(); + } + + + @Override + public void messageChanged() { + validBuffer = false; + } + + protected CoreMessage(CoreMessage other) { + this(other, other.properties); + } + + public CoreMessage(long id, int bufferSize) { + this.initBuffer(bufferSize); + this.setMessageID(id); + } + + protected CoreMessage(CoreMessage other, TypedProperties copyProperties) { + this.body = other.body; + this.endOfBodyPosition = other.endOfBodyPosition; + this.messageID = other.messageID; + this.address = other.address; + this.type = other.type; + this.durable = other.durable; + this.expiration = other.expiration; + this.timestamp = other.timestamp; + this.priority = other.priority; + this.userID = other.userID; + if (copyProperties != null) { + this.properties = new TypedProperties(copyProperties); + } + if (other.buffer != null) { + this.buffer = other.buffer.copy(); + } + } + + @Override + public void copyHeadersAndProperties(final Message msg) { + messageID = msg.getMessageID(); + address = msg.getAddressSimpleString(); + userID = (UUID)msg.getUserID(); + type = msg.getType(); + durable = msg.isDurable(); + expiration = msg.getExpiration(); + timestamp = msg.getTimestamp(); + priority = msg.getPriority(); + + if (msg instanceof CoreMessage) { + properties = ((CoreMessage)msg).getTypedProperties(); + } else { + // TODO-now: copy stuff + logger.warn("Must implement copyHeaderAndProperties for other messages"); + } + } + + + @Override + public Message copy() { + return new CoreMessage(this); + } + + @Override + public Message copy(long newID) { + return copy().setMessageID(newID); + } + + @Override + public long getExpiration() { + return expiration; + } + + @Override + public long getTimestamp() { + return timestamp; + } + + @Override + public CoreMessage setTimestamp(long timestamp) { + this.timestamp = timestamp; + return this; + } + + @Override + public long getMessageID() { + return messageID; + } + + @Override + public byte getPriority() { + return priority; + } + + @Override + public UUID getUserID() { + return userID; + } + + @Override + public CoreMessage setUserID(Object uuid) { + this.userID = (UUID)uuid; + return this; + } + + @Override + public CoreMessage setMessageID(long messageID) { + this.messageID = messageID; + if (messageIDPosition >= 0 && validBuffer) { + buffer.setLong(messageIDPosition, messageID); + } + return this; + } + + @Override + public CoreMessage setAddress(SimpleString address) { + if (validBuffer && !address.equals(this.address)) { + messageChanged(); + } + this.address = address; + return this; + } + + @Override + public SimpleString getAddressSimpleString() { + return address; + } + + + @Override + public CoreMessage setExpiration(long expiration) { + this.expiration = expiration; + return this; + } + + @Override + public CoreMessage setPriority(byte priority) { + this.priority = priority; + return this; + } + + public CoreMessage setUserID(UUID userID) { + this.userID = userID; + return this; + } + + /** + * I am keeping this synchronized as the decode of the Properties is lazy + */ + protected TypedProperties checkProperties() { + if (properties == null) { + TypedProperties properties = new TypedProperties(); + if (buffer != null && propertiesLocation >= 0) { + properties.decode(buffer.duplicate().readerIndex(propertiesLocation)); + } + this.properties = properties; + } + + return this.properties; + } + + + @Override + public int getMemoryEstimate() { + if (memoryEstimate == -1) { + if (buffer == null) { + new Exception("It is null").printStackTrace(); + } + if (properties == null) { + new Exception("Properties It is null").printStackTrace(); + } + memoryEstimate = memoryOffset + + (buffer != null ? buffer.capacity() : 0) + + (properties != null ? properties.getMemoryOffset() : 0); + } + + return memoryEstimate; + } + + @Override + public byte getType() { + return type; + } + + @Override + public CoreMessage setType(byte type) { + this.type = type; + return this; + } + + private void decode() { + endOfBodyPosition = buffer.readInt(); + + buffer.skipBytes(endOfBodyPosition - BUFFER_HEADER_SPACE); + + decodeHeadersAndProperties(buffer, true); + buffer.readerIndex(0); + + internalWritableBuffer(); + } + + + public void decodeHeadersAndProperties(final ByteBuf buffer) { + decodeHeadersAndProperties(buffer, false); + } + + private void decodeHeadersAndProperties(final ByteBuf buffer, boolean lazyProperties) { + messageIDPosition = buffer.readerIndex(); + messageID = buffer.readLong(); + + address = SimpleString.readNullableSimpleString(buffer); + if (buffer.readByte() == DataConstants.NOT_NULL) { + byte[] bytes = new byte[16]; + buffer.readBytes(bytes); + userID = new UUID(UUID.TYPE_TIME_BASED, bytes); + } else { + userID = null; + } + type = buffer.readByte(); + durable = buffer.readBoolean(); + expiration = buffer.readLong(); + timestamp = buffer.readLong(); + priority = buffer.readByte(); + if (lazyProperties) { + properties = null; + propertiesLocation = buffer.readerIndex(); + } else { + properties = new TypedProperties(); + properties.decode(buffer); + } + } + + + public synchronized CoreMessage encode() { + + checkProperties(); + + if (writableBuffer != null) { + // The message encode takes into consideration the PacketImpl which is not part of this encoding + // so we always need to take the BUFFER_HEADER_SPACE from packet impl into consideration + endOfBodyPosition = writableBuffer.writerIndex() + BUFFER_HEADER_SPACE - 4; + } else if (endOfBodyPosition <= 0) { + endOfBodyPosition = BUFFER_HEADER_SPACE; + } + + buffer.setIndex(0, 0); + buffer.writeInt(endOfBodyPosition); + + // The end of body position + buffer.writerIndex(endOfBodyPosition - BUFFER_HEADER_SPACE + DataConstants.SIZE_INT); + + encodeHeadersAndProperties(buffer); + + validBuffer = true; + + return this; + } + + public void encodeHeadersAndProperties(final ByteBuf buffer) { + checkProperties(); + messageIDPosition = buffer.writerIndex(); + buffer.writeLong(messageID); + SimpleString.writeNullableSimpleString(buffer, address); + if (userID == null) { + buffer.writeByte(DataConstants.NULL); + } else { + buffer.writeByte(DataConstants.NOT_NULL); + buffer.writeBytes(userID.asBytes()); + } + buffer.writeByte(type); + buffer.writeBoolean(durable); + buffer.writeLong(expiration); + buffer.writeLong(timestamp); + buffer.writeByte(priority); + properties.encode(buffer); + } + + @Override + public Object getBody() { + + if (body == null) { + decodeBody(); + } + + return body; + } + + private void decodeBody() { + buffer.readerIndex(DataConstants.SIZE_INT); + switch (getBodyType()) { + case Text: + body = SimpleString.readNullableSimpleString(buffer); + break; + + default: + break; + } + } + + public int getHeadersAndPropertiesEncodeSize() { + return DataConstants.SIZE_LONG + // Message ID + DataConstants.SIZE_BYTE + // user id null? + (userID == null ? 0 : 16) + + /* address */SimpleString.sizeofNullableString(address) + + DataConstants./* Type */SIZE_BYTE + + DataConstants./* Durable */SIZE_BOOLEAN + + DataConstants./* Expiration */SIZE_LONG + + DataConstants./* Timestamp */SIZE_LONG + + DataConstants./* Priority */SIZE_BYTE + + /* PropertySize and Properties */checkProperties().getEncodeSize(); + } + + @Override + public BodyType getBodyType() { + return getBodyType(type); + } + + public static BodyType getBodyType(byte type) { + switch (type) { + + case Message.DEFAULT_TYPE: + return BodyType.Undefined; + + case Message.OBJECT_TYPE: + return BodyType.Object; + + case Message.TEXT_TYPE: + return BodyType.Text; + + case Message.BYTES_TYPE: + return BodyType.Text; + + case Message.MAP_TYPE: + return BodyType.Map; + + case Message.STREAM_TYPE: + return BodyType.Stream; + + default: + return BodyType.Undefined; + + } + } + + @Override + public int getEncodeSize() { + checkEncode(); + return buffer == null ? -1 : buffer.writerIndex(); + } + + @Override + public CoreMessage setBody(final BodyType bodyType, Object body) { + messageChanged(); + + this.type = Message.TEXT_TYPE; + this.body = body; + + return this; + } + + @Override + public boolean isLargeMessage() { + return false; + } + + private void encodeBody(ByteBuf intoBuffer) { + intoBuffer.writerIndex(DataConstants.SIZE_INT); + + switch (getBodyType()) { + + // TODO-now implement other types + case Text: + SimpleString.writeNullableSimpleString(intoBuffer, SimpleString.toSimpleString(body == null ? null : body.toString())); + break; + + default: + break; + } + + + endOfBodyPosition = buffer.writerIndex() + BUFFER_HEADER_SPACE; + buffer.setInt(0, endOfBodyPosition); + } + + @Override + public String getAddress() { + if (address == null) { + return null; + } else { + return address.toString(); + } + } + + @Override + public CoreMessage setAddress(String address) { + messageChanged(); + this.address = SimpleString.toSimpleString(address); + return this; + } + + @Override + public CoreMessage setBuffer(ByteBuf buffer) { + this.buffer = buffer; + + return this; + } + + @Override + public ByteBuf getBuffer() { + return buffer; + } + + @Override + public boolean isDurable() { + return durable; + } + + @Override + public CoreMessage setDurable(boolean durable) { + messageChanged(); + this.durable = durable; + return this; + } + + + @Override + public CoreMessage putBooleanProperty(final String key, final boolean value) { + messageChanged(); + checkProperties(); + properties.putBooleanProperty(new SimpleString(key), value); + return this; + } + + @Override + public CoreMessage putBooleanProperty(final SimpleString key, final boolean value) { + messageChanged(); + checkProperties(); + properties.putBooleanProperty(key, value); + return this; + } + + @Override + public Boolean getBooleanProperty(final SimpleString key) throws ActiveMQPropertyConversionException { + checkProperties(); + return properties.getBooleanProperty(key); + } + + @Override + public Boolean getBooleanProperty(final String key) throws ActiveMQPropertyConversionException { + checkProperties(); + return properties.getBooleanProperty(new SimpleString(key)); + } + + + @Override + public CoreMessage putByteProperty(final SimpleString key, final byte value) { + messageChanged(); + checkProperties(); + properties.putByteProperty(key, value); + return this; + } + + + @Override + public CoreMessage putByteProperty(final String key, final byte value) { + messageChanged(); + checkProperties(); + properties.putByteProperty(new SimpleString(key), value); + + return this; + } + + + @Override + public Byte getByteProperty(final SimpleString key) throws ActiveMQPropertyConversionException { + checkProperties(); + return properties.getByteProperty(key); + } + + @Override + public Byte getByteProperty(final String key) throws ActiveMQPropertyConversionException { + return getByteProperty(SimpleString.toSimpleString(key)); + } + + @Override + public CoreMessage putBytesProperty(final SimpleString key, final byte[] value) { + messageChanged(); + checkProperties(); + properties.putBytesProperty(key, value); + + return this; + } + + @Override + public CoreMessage putBytesProperty(final String key, final byte[] value) { + messageChanged(); + checkProperties(); + properties.putBytesProperty(new SimpleString(key), value); + return this; + } + + + @Override + public byte[] getBytesProperty(final SimpleString key) throws ActiveMQPropertyConversionException { + checkProperties(); + return properties.getBytesProperty(key); + } + + @Override + public byte[] getBytesProperty(final String key) throws ActiveMQPropertyConversionException { + return getBytesProperty(new SimpleString(key)); + } + + @Override + public CoreMessage putCharProperty(SimpleString key, char value) { + messageChanged(); + checkProperties(); + properties.putCharProperty(key, value); + return this; + } + + @Override + public CoreMessage putCharProperty(String key, char value) { + messageChanged(); + checkProperties(); + properties.putCharProperty(new SimpleString(key), value); + return this; + } + + @Override + public CoreMessage putShortProperty(final SimpleString key, final short value) { + messageChanged(); + checkProperties(); + properties.putShortProperty(key, value); + return this; + } + + @Override + public CoreMessage putShortProperty(final String key, final short value) { + messageChanged(); + checkProperties(); + properties.putShortProperty(new SimpleString(key), value); + return this; + } + + + @Override + public CoreMessage putIntProperty(final SimpleString key, final int value) { + messageChanged(); + checkProperties(); + properties.putIntProperty(key, value); + return this; + } + + @Override + public CoreMessage putIntProperty(final String key, final int value) { + messageChanged(); + checkProperties(); + properties.putIntProperty(new SimpleString(key), value); + return this; + } + + @Override + public Integer getIntProperty(final SimpleString key) throws ActiveMQPropertyConversionException { + checkProperties(); + return properties.getIntProperty(key); + } + + @Override + public Integer getIntProperty(final String key) throws ActiveMQPropertyConversionException { + return getIntProperty(SimpleString.toSimpleString(key)); + } + + + @Override + public CoreMessage putLongProperty(final SimpleString key, final long value) { + messageChanged(); + checkProperties(); + properties.putLongProperty(key, value); + return this; + } + + @Override + public CoreMessage putLongProperty(final String key, final long value) { + messageChanged(); + checkProperties(); + properties.putLongProperty(new SimpleString(key), value); + return this; + } + + @Override + public Long getLongProperty(final SimpleString key) throws ActiveMQPropertyConversionException { + checkProperties(); + return properties.getLongProperty(key); + } + + @Override + public Long getLongProperty(final String key) throws ActiveMQPropertyConversionException { + checkProperties(); + return getLongProperty(SimpleString.toSimpleString(key)); + } + + + @Override + public CoreMessage putFloatProperty(final SimpleString key, final float value) { + messageChanged(); + checkProperties(); + properties.putFloatProperty(key, value); + return this; + } + + @Override + public CoreMessage putFloatProperty(final String key, final float value) { + messageChanged(); + checkProperties(); + properties.putFloatProperty(new SimpleString(key), value); + return this; + } + + @Override + public CoreMessage putDoubleProperty(final SimpleString key, final double value) { + messageChanged(); + checkProperties(); + properties.putDoubleProperty(key, value); + return this; + } + + @Override + public CoreMessage putDoubleProperty(final String key, final double value) { + messageChanged(); + checkProperties(); + properties.putDoubleProperty(new SimpleString(key), value); + return this; + } + + + @Override + public Double getDoubleProperty(final SimpleString key) throws ActiveMQPropertyConversionException { + messageChanged(); + checkProperties(); + return properties.getDoubleProperty(key); + } + + @Override + public Double getDoubleProperty(final String key) throws ActiveMQPropertyConversionException { + checkProperties(); + return getDoubleProperty(SimpleString.toSimpleString(key)); + } + + @Override + public CoreMessage putStringProperty(final SimpleString key, final SimpleString value) { + messageChanged(); + checkProperties(); + properties.putSimpleStringProperty(key, value); + return this; + } + + @Override + public CoreMessage putStringProperty(final String key, final String value) { + messageChanged(); + checkProperties(); + properties.putSimpleStringProperty(new SimpleString(key), SimpleString.toSimpleString(value)); + return this; + } + + @Override + public CoreMessage putObjectProperty(final SimpleString key, + final Object value) throws ActiveMQPropertyConversionException { + messageChanged(); + checkProperties(); + TypedProperties.setObjectProperty(key, value, properties); + return this; + } + + @Override + public Object getObjectProperty(final String key) { + checkProperties(); + return getObjectProperty(SimpleString.toSimpleString(key)); + } + + @Override + public Object getObjectProperty(final SimpleString key) { + checkProperties(); + return properties.getProperty(key); + } + + @Override + public CoreMessage putObjectProperty(final String key, final Object value) throws ActiveMQPropertyConversionException { + messageChanged(); + putObjectProperty(new SimpleString(key), value); + return this; + } + + @Override + public Short getShortProperty(final SimpleString key) throws ActiveMQPropertyConversionException { + checkProperties(); + return properties.getShortProperty(key); + } + + @Override + public Short getShortProperty(final String key) throws ActiveMQPropertyConversionException { + checkProperties(); + return properties.getShortProperty(new SimpleString(key)); + } + + @Override + public Float getFloatProperty(final SimpleString key) throws ActiveMQPropertyConversionException { + checkProperties(); + return properties.getFloatProperty(key); + } + + @Override + public Float getFloatProperty(final String key) throws ActiveMQPropertyConversionException { + checkProperties(); + return properties.getFloatProperty(new SimpleString(key)); + } + + @Override + public String getStringProperty(final SimpleString key) throws ActiveMQPropertyConversionException { + SimpleString str = getSimpleStringProperty(key); + + if (str == null) { + return null; + } else { + return str.toString(); + } + } + + @Override + public String getStringProperty(final String key) throws ActiveMQPropertyConversionException { + return getStringProperty(new SimpleString(key)); + } + + @Override + public SimpleString getSimpleStringProperty(final SimpleString key) throws ActiveMQPropertyConversionException { + checkProperties(); + return properties.getSimpleStringProperty(key); + } + + @Override + public SimpleString getSimpleStringProperty(final String key) throws ActiveMQPropertyConversionException { + checkProperties(); + return properties.getSimpleStringProperty(new SimpleString(key)); + } + + @Override + public Object removeProperty(final SimpleString key) { + checkProperties(); + Object oldValue = properties.removeProperty(key); + if (oldValue != null) { + messageChanged(); + } + return oldValue; + } + + @Override + public Object removeProperty(final String key) { + messageChanged(); + checkProperties(); + Object oldValue = properties.removeProperty(new SimpleString(key)); + if (oldValue != null) { + messageChanged(); + } + return oldValue; + } + + @Override + public boolean containsProperty(final SimpleString key) { + checkProperties(); + return properties.containsProperty(key); + } + + @Override + public boolean containsProperty(final String key) { + checkProperties(); + return properties.containsProperty(new SimpleString(key)); + } + + @Override + public Set<SimpleString> getPropertyNames() { + checkProperties(); + return properties.getPropertyNames(); + } + + @Override + public LargeBodyEncoder getBodyEncoder() throws ActiveMQException { + return new DecodingContext(); + } + + private final class DecodingContext implements LargeBodyEncoder { + + private int lastPos = 0; + + private DecodingContext() { + } + + @Override + public void open() { + } + + @Override + public void close() { + } + + @Override + public long getLargeBodySize() { + return buffer.writerIndex(); + } + + @Override + public int encode(final ByteBuffer bufferRead) throws ActiveMQException { + ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(bufferRead); + return encode(buffer, bufferRead.capacity()); + } + + @Override + public int encode(final ActiveMQBuffer bufferOut, final int size) { + bufferOut.byteBuf().writeBytes(buffer, lastPos, size); + lastPos += size; + return size; + } + } + + @Override + public int getPersistSize() { + checkEncode(); + return buffer.writerIndex() + DataConstants.SIZE_INT; + } + + @Override + public void persist(ActiveMQBuffer targetRecord) { + targetRecord.writeInt(buffer.writerIndex()); + targetRecord.writeBytes(buffer, 0, buffer.writerIndex()); + } + + @Override + public void reloadPersistence(ActiveMQBuffer record) { + int size = record.readInt(); + initBuffer(size); + buffer.setIndex(0, 0).writeBytes(record.byteBuf(), size); + decode(); + + } + + @Override + public Message toCore() { + return this; + } + + + + @Override + public String toString() { + try { + return "CoreMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() + + ", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) + + ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this); + } catch (Throwable e) { + return "ServerMessage[messageID=" + messageID + "]"; + } + } + + + private static String toDate(long timestamp) { + if (timestamp == 0) { + return "0"; + } else { + return new java.util.Date(timestamp).toString(); + } + + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java new file mode 100644 index 0000000..ddf39d2 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.message.impl; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.persistence.Persister; +import org.apache.activemq.artemis.utils.DataConstants; + +public class CoreMessagePersister implements Persister<Message> { + + public static CoreMessagePersister theInstance = new CoreMessagePersister(); + + public static CoreMessagePersister getInstance() { + return theInstance; + } + + protected CoreMessagePersister() { + } + + + @Override + public int getEncodeSize(Message record) { + return DataConstants.SIZE_BYTE + record.getPersistSize() + + SimpleString.sizeofNullableString(record.getAddressSimpleString()) + DataConstants.SIZE_LONG; + } + + + /** Sub classes must add the first short as the protocol-id */ + @Override + public void encode(ActiveMQBuffer buffer, Message record) { + buffer.writeByte((byte)1); + buffer.writeLong(record.getMessageID()); + buffer.writeNullableSimpleString(record.getAddressSimpleString()); + record.persist(buffer); + } + + + @Override + public Message decode(ActiveMQBuffer buffer, Message record) { + // the caller must consume the first byte already, as that will be used to decide what persister (protocol) to use + long id = buffer.readLong(); + SimpleString address = buffer.readNullableSimpleString(); + record = new CoreMessage(); + record.reloadPersistence(buffer); + record.setMessageID(id); + record.setAddress(address); + return record; + } +}
