http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java index c21ebda..b0ab52b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java @@ -17,7 +17,8 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.utils.DataConstants; @@ -30,7 +31,7 @@ public class SessionReceiveMessage extends MessagePacket { private int deliveryCount; - public SessionReceiveMessage(final long consumerID, final MessageInternal message, final int deliveryCount) { + public SessionReceiveMessage(final long consumerID, final ICoreMessage message, final int deliveryCount) { super(SESS_RECEIVE_MSG, message); this.consumerID = consumerID; @@ -38,7 +39,7 @@ public class SessionReceiveMessage extends MessagePacket { this.deliveryCount = deliveryCount; } - public SessionReceiveMessage(final MessageInternal message) { + public SessionReceiveMessage(final CoreMessage message) { super(SESS_RECEIVE_MSG, message); } @@ -53,53 +54,28 @@ public class SessionReceiveMessage extends MessagePacket { } @Override - public ActiveMQBuffer encode(final RemotingConnection connection) { - ActiveMQBuffer buffer = message.getEncodedBuffer(); - - ActiveMQBuffer bufferWrite = connection.createTransportBuffer(buffer.writerIndex() + DataConstants.SIZE_LONG + DataConstants.SIZE_INT, true); - bufferWrite.writeBytes(buffer, 0, buffer.capacity()); - bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex()); - - // Sanity check - if (bufferWrite.writerIndex() != message.getEndOfMessagePosition()) { - throw new IllegalStateException("Wrong encode position"); - } - - bufferWrite.writeLong(consumerID); - bufferWrite.writeInt(deliveryCount); - - size = bufferWrite.writerIndex(); - - // Write standard headers - - int len = size - DataConstants.SIZE_INT; - bufferWrite.setInt(0, len); - bufferWrite.setByte(DataConstants.SIZE_INT, getType()); - bufferWrite.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID); - - // Position reader for reading by Netty - bufferWrite.setIndex(0, size); - - return bufferWrite; + protected ActiveMQBuffer createPacket(RemotingConnection connection, boolean usePooled) { + return internalCreatePacket(message.getEncodeSize() + PACKET_HEADERS_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT, connection, usePooled); } @Override - public void decode(final ActiveMQBuffer buffer) { - channelID = buffer.readLong(); - - message.decodeFromBuffer(buffer); - - consumerID = buffer.readLong(); + public void encodeRest(ActiveMQBuffer buffer) { + message.sendBuffer(buffer.byteBuf(), deliveryCount); + buffer.writeLong(consumerID); + buffer.writeInt(deliveryCount); + } - deliveryCount = buffer.readInt(); + @Override + public void decodeRest(final ActiveMQBuffer buffer) { + // Buffer comes in after having read standard headers and positioned at Beginning of body part - size = buffer.readerIndex(); + message.receiveBuffer(copyMessageBuffer(buffer.byteBuf(), DataConstants.SIZE_LONG + DataConstants.SIZE_INT)); - // Need to position buffer for reading + buffer.readerIndex(buffer.capacity() - DataConstants.SIZE_LONG - DataConstants.SIZE_INT); + this.consumerID = buffer.readLong(); + this.deliveryCount = buffer.readInt(); - buffer.setIndex(PACKET_HEADERS_SIZE + DataConstants.SIZE_INT, message.getEndOfBodyPosition()); } - @Override public int hashCode() { final int prime = 31;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java index b4ec027..0ecfe33 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java @@ -17,8 +17,8 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; /** * A SessionSendContinuationMessage<br> @@ -28,7 +28,7 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage { private boolean requiresResponse; // Used on confirmation handling - private MessageInternal message; + private Message message; /** * In case, we are using a different handler than the one set on the {@link org.apache.activemq.artemis.api.core.client.ClientSession} * <br> @@ -58,7 +58,7 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage { * @param continues * @param requiresResponse */ - public SessionSendContinuationMessage(final MessageInternal message, + public SessionSendContinuationMessage(final Message message, final byte[] body, final boolean continues, final boolean requiresResponse, @@ -87,7 +87,7 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage { /** * @return the message */ - public MessageInternal getMessage() { + public Message getMessage() { return message; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java index bf4290b..869940c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java @@ -18,7 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.Message; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; public class SessionSendLargeMessage extends PacketImpl implements MessagePacketI { @@ -26,13 +26,13 @@ public class SessionSendLargeMessage extends PacketImpl implements MessagePacket /** * Used only if largeMessage */ - private final MessageInternal largeMessage; + private final Message largeMessage; // Static -------------------------------------------------------- // Constructors -------------------------------------------------- - public SessionSendLargeMessage(final MessageInternal largeMessage) { + public SessionSendLargeMessage(final Message largeMessage) { super(SESS_SEND_LARGE); this.largeMessage = largeMessage; @@ -40,7 +40,7 @@ public class SessionSendLargeMessage extends PacketImpl implements MessagePacket // Public -------------------------------------------------------- - public MessageInternal getLargeMessage() { + public Message getLargeMessage() { return largeMessage; } @@ -51,12 +51,12 @@ public class SessionSendLargeMessage extends PacketImpl implements MessagePacket @Override public void encodeRest(final ActiveMQBuffer buffer) { - largeMessage.encodeHeadersAndProperties(buffer); + ((CoreMessage)largeMessage).encodeHeadersAndProperties(buffer.byteBuf()); } @Override public void decodeRest(final ActiveMQBuffer buffer) { - largeMessage.decodeHeadersAndProperties(buffer); + ((CoreMessage)largeMessage).decodeHeadersAndProperties(buffer.byteBuf()); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java index c7bb30e..43bb0be 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java @@ -16,11 +16,12 @@ */ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; +import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; -import org.apache.activemq.artemis.utils.DataConstants; public class SessionSendMessage extends MessagePacket { @@ -36,7 +37,8 @@ public class SessionSendMessage extends MessagePacket { */ private final transient SendAcknowledgementHandler handler; - public SessionSendMessage(final MessageInternal message, + /** This will be using the CoreMessage because it is meant for the core-protocol */ + public SessionSendMessage(final ICoreMessage message, final boolean requiresResponse, final SendAcknowledgementHandler handler) { super(SESS_SEND, message); @@ -44,7 +46,7 @@ public class SessionSendMessage extends MessagePacket { this.requiresResponse = requiresResponse; } - public SessionSendMessage(final MessageInternal message) { + public SessionSendMessage(final CoreMessage message) { super(SESS_SEND, message); this.handler = null; } @@ -60,53 +62,29 @@ public class SessionSendMessage extends MessagePacket { } @Override - public ActiveMQBuffer encode(final RemotingConnection connection) { - ActiveMQBuffer buffer = message.getEncodedBuffer(); - - ActiveMQBuffer bufferWrite; - if (connection == null) { - // this is for unit tests only - bufferWrite = buffer.copy(0, buffer.capacity()); - } else { - bufferWrite = connection.createTransportBuffer(buffer.writerIndex() + 1, true); // 1 for the requireResponse - } - bufferWrite.writeBytes(buffer, 0, buffer.writerIndex()); - bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex()); - - // Sanity check - if (bufferWrite.writerIndex() != message.getEndOfMessagePosition()) { - throw new IllegalStateException("Wrong encode position"); - } - - bufferWrite.writeBoolean(requiresResponse); - - size = bufferWrite.writerIndex(); - - // Write standard headers + protected ActiveMQBuffer createPacket(RemotingConnection connection, boolean usePooled) { + return internalCreatePacket(message.getEncodeSize() + PACKET_HEADERS_SIZE + 1, connection, usePooled); + } - int len = size - DataConstants.SIZE_INT; - bufferWrite.setInt(0, len); - bufferWrite.setByte(DataConstants.SIZE_INT, getType()); - bufferWrite.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID); + @Override + public void encodeRest(ActiveMQBuffer buffer) { + message.sendBuffer(buffer.byteBuf(), 0); + buffer.writeBoolean(requiresResponse); - // Position reader for reading by Netty - bufferWrite.readerIndex(0); - return bufferWrite; } @Override public void decodeRest(final ActiveMQBuffer buffer) { // Buffer comes in after having read standard headers and positioned at Beginning of body part - message.decodeFromBuffer(buffer); + ByteBuf messageBuffer = copyMessageBuffer(buffer.byteBuf(), 1); + message.receiveBuffer(messageBuffer); - int ri = buffer.readerIndex(); + buffer.readerIndex(buffer.capacity() - 1); requiresResponse = buffer.readBoolean(); - buffer.readerIndex(ri); - } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java index 65aeccb..8560f5d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java @@ -26,7 +26,7 @@ public class MapMessageUtil extends MessageUtil { */ public static void writeBodyMap(ActiveMQBuffer message, TypedProperties properties) { message.resetWriterIndex(); - properties.encode(message); + properties.encode(message.byteBuf()); } /** @@ -43,7 +43,7 @@ public class MapMessageUtil extends MessageUtil { */ public static void readBodyMap(ActiveMQBuffer message, TypedProperties map) { message.resetReaderIndex(); - map.decode(message); + map.decode(message.byteBuf()); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java index 72795b7..3fddb8e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java @@ -23,7 +23,9 @@ import java.util.Set; import java.util.concurrent.Executor; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientSession; @@ -33,8 +35,6 @@ import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal; import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal; import org.apache.activemq.artemis.core.client.impl.ClientProducerCreditsImpl; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.activemq.artemis.utils.SimpleIDGenerator; @@ -128,9 +128,9 @@ public abstract class SessionContext { } - public abstract int getCreditsOnSendingFull(MessageInternal msgI); + public abstract int getCreditsOnSendingFull(Message msgI); - public abstract void sendFullMessage(MessageInternal msgI, + public abstract void sendFullMessage(ICoreMessage msgI, boolean sendBlocking, SendAcknowledgementHandler handler, SimpleString defaultAddress) throws ActiveMQException; @@ -142,9 +142,9 @@ public abstract class SessionContext { * @return * @throws ActiveMQException */ - public abstract int sendInitialChunkOnLargeMessage(MessageInternal msgI) throws ActiveMQException; + public abstract int sendInitialChunkOnLargeMessage(Message msgI) throws ActiveMQException; - public abstract int sendLargeMessageChunk(MessageInternal msgI, + public abstract int sendLargeMessageChunk(Message msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, @@ -152,7 +152,7 @@ public abstract class SessionContext { int reconnectID, SendAcknowledgementHandler messageHandler) throws ActiveMQException; - public abstract int sendServerLargeMessageChunk(MessageInternal msgI, + public abstract int sendServerLargeMessageChunk(Message msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java new file mode 100644 index 0000000..5e92eaf --- /dev/null +++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java @@ -0,0 +1,365 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.message; + +import java.util.LinkedList; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage; +import org.apache.activemq.artemis.reader.TextMessageUtil; +import org.apache.activemq.artemis.utils.Base64; +import org.apache.activemq.artemis.utils.ByteUtil; +import org.apache.activemq.artemis.utils.UUID; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +public class CoreMessageTest { + + public static final SimpleString ADDRESS = new SimpleString("this.local.address"); + public static final byte MESSAGE_TYPE = Message.TEXT_TYPE; + public static final boolean DURABLE = true; + public static final long EXPIRATION = 123L; + public static final long TIMESTAMP = 321L; + public static final byte PRIORITY = (byte) 3; + public static final String TEXT = "hi"; + public static final String BIGGER_TEXT = "AAAAAAAAAAAAAAAAAAAAAAAAA ASDF ASDF ASF ASD ASF ASDF ASDF ASDF ASF ADSF ASDF"; + public static final String SMALLER_TEXT = "H"; + public static final UUID uuid = new UUID(UUID.TYPE_TIME_BASED, new byte[]{0, 0, 0, 0, + 0, 0, 0, 0, + 0, 0, 0, 0, + 0, 0, 0, 1}); + public static final SimpleString PROP1_NAME = new SimpleString("t1"); + public static final SimpleString PROP1_VALUE = new SimpleString("value-t1"); + + /** + * This encode was generated by {@link #generate()}. + * Run it manually with a right-click on the IDE to eventually update it + * */ + // body = "hi"; + private final String STRING_ENCODE = "AAAAFgEAAAAEaABpAAAAAAAAAAAAAQAAACR0AGgAaQBzAC4AbABvAGMAYQBsAC4AYQBkAGQAcgBlAHMAcwAAAwEAAAAAAAAAewAAAAAAAAFBAwEAAAABAAAABHQAMQAKAAAAEHYAYQBsAHUAZQAtAHQAMQA="; + + private ByteBuf BYTE_ENCODE; + + + @Before + public void before() { + BYTE_ENCODE = Unpooled.wrappedBuffer(Base64.decode(STRING_ENCODE, Base64.DONT_BREAK_LINES | Base64.URL_SAFE)); + // some extra caution here, nothing else, to make sure we would get the same encoding back + Assert.assertEquals(STRING_ENCODE, encodeString(BYTE_ENCODE.array())); + BYTE_ENCODE.readerIndex(0).writerIndex(BYTE_ENCODE.capacity()); + } + + /** The message is received, then sent to the other side untouched */ + @Test + public void testPassThrough() { + CoreMessage decodedMessage = decodeMessage(); + + Assert.assertEquals(TEXT, TextMessageUtil.readBodyText(decodedMessage.getReadOnlyBodyBuffer()).toString()); + } + + /** The message is received, then sent to the other side untouched */ + @Test + public void sendThroughPackets() { + CoreMessage decodedMessage = decodeMessage(); + + int encodeSize = decodedMessage.getEncodeSize(); + Assert.assertEquals(BYTE_ENCODE.capacity(), encodeSize); + + SessionSendMessage sendMessage = new SessionSendMessage(decodedMessage, true, null); + sendMessage.setChannelID(777); + + ActiveMQBuffer buffer = sendMessage.encode(null); + + byte[] byteArray = buffer.byteBuf().array(); + System.out.println("Sending " + ByteUtil.bytesToHex(buffer.toByteBuffer().array(), 1) + ", bytes = " + byteArray.length); + + buffer.readerIndex(5); + + SessionSendMessage sendMessageReceivedSent = new SessionSendMessage(new CoreMessage()); + + sendMessageReceivedSent.decode(buffer); + + Assert.assertEquals(encodeSize, sendMessageReceivedSent.getMessage().getEncodeSize()); + + Assert.assertTrue(sendMessageReceivedSent.isRequiresResponse()); + + Assert.assertEquals(TEXT, TextMessageUtil.readBodyText(sendMessageReceivedSent.getMessage().getReadOnlyBodyBuffer()).toString()); + } + + /** The message is received, then sent to the other side untouched */ + @Test + public void sendThroughPacketsClient() { + CoreMessage decodedMessage = decodeMessage(); + + int encodeSize = decodedMessage.getEncodeSize(); + Assert.assertEquals(BYTE_ENCODE.capacity(), encodeSize); + + SessionReceiveMessage sendMessage = new SessionReceiveMessage(33, decodedMessage, 7); + sendMessage.setChannelID(777); + + ActiveMQBuffer buffer = sendMessage.encode(null); + + buffer.readerIndex(5); + + SessionReceiveMessage sendMessageReceivedSent = new SessionReceiveMessage(new CoreMessage()); + + sendMessageReceivedSent.decode(buffer); + + Assert.assertEquals(33, sendMessageReceivedSent.getConsumerID()); + + Assert.assertEquals(7, sendMessageReceivedSent.getDeliveryCount()); + + Assert.assertEquals(encodeSize, sendMessageReceivedSent.getMessage().getEncodeSize()); + + Assert.assertEquals(TEXT, TextMessageUtil.readBodyText(sendMessageReceivedSent.getMessage().getReadOnlyBodyBuffer()).toString()); + } + + private CoreMessage decodeMessage() { + + ByteBuf newBuffer = Unpooled.buffer(BYTE_ENCODE.capacity()); + newBuffer.writeBytes(BYTE_ENCODE, 0, BYTE_ENCODE.writerIndex()); + + CoreMessage coreMessage = internalDecode(newBuffer); + + int encodeSize = coreMessage.getEncodeSize(); + + Assert.assertEquals(newBuffer.capacity(), encodeSize); + + Assert.assertEquals(ADDRESS, coreMessage.getAddressSimpleString()); + + Assert.assertEquals(PROP1_VALUE.toString(), coreMessage.getStringProperty(PROP1_NAME)); + + ByteBuf destinedBuffer = Unpooled.buffer(BYTE_ENCODE.array().length); + coreMessage.sendBuffer(destinedBuffer, 0); + + byte[] destinedArray = destinedBuffer.array(); + byte[] sourceArray = BYTE_ENCODE.array(); + + CoreMessage newDecoded = internalDecode(Unpooled.wrappedBuffer(destinedArray)); + + Assert.assertEquals(encodeSize, newDecoded.getEncodeSize()); + + Assert.assertArrayEquals(sourceArray, destinedArray); + + return coreMessage; + } + + private CoreMessage internalDecode(ByteBuf bufferOrigin) { + CoreMessage coreMessage = new CoreMessage(); +// System.out.println("Bytes from test " + ByteUtil.bytesToHex(bufferOrigin.array(), 1)); + coreMessage.receiveBuffer(bufferOrigin); + return coreMessage; + } + + /** The message is received, then sent to the other side untouched */ + @Test + public void testChangeBodyStringSameSize() { + testChangeBodyString(TEXT.toUpperCase()); + } + + @Test + public void testChangeBodyBiggerString() { + testChangeBodyString(BIGGER_TEXT); + } + + @Test + public void testGenerateEmpty() { + CoreMessage empty = new CoreMessage().initBuffer(100); + ByteBuf buffer = Unpooled.buffer(200); + empty.sendBuffer(buffer, 0); + + CoreMessage empty2 = new CoreMessage(); + empty2.receiveBuffer(buffer); + + try { + empty2.getBodyBuffer().readByte(); + Assert.fail("should throw exception"); + } catch (Exception expected) { + + } + } + + @Test + public void testSaveReceiveLimitedBytes() { + CoreMessage empty = new CoreMessage().initBuffer(100); + System.out.println("R " + empty.getBodyBuffer().readerIndex() + " W " + empty.getBodyBuffer().writerIndex()); + empty.getBodyBuffer().writeByte((byte)7); + System.out.println("R " + empty.getBodyBuffer().readerIndex() + " W " + empty.getBodyBuffer().writerIndex()); + + ByteBuf buffer = Unpooled.buffer(200); + empty.sendBuffer(buffer, 0); + + CoreMessage empty2 = new CoreMessage(); + empty2.receiveBuffer(buffer); + + Assert.assertEquals((byte)7, empty2.getBodyBuffer().readByte()); + + System.out.println("Readable :: " + empty2.getBodyBuffer().readerIndex() + " writer :" + empty2.getBodyBuffer().writerIndex()); + + try { + empty2.getBodyBuffer().readByte(); + Assert.fail("should throw exception"); + } catch (Exception expected) { + + } + } + + @Test + public void testChangeBodySmallerString() { + testChangeBodyString(SMALLER_TEXT); + } + + public void testChangeBodyString(String newString) { + CoreMessage coreMessage = decodeMessage(); + + coreMessage.putStringProperty("newProperty", "newValue"); + ActiveMQBuffer legacyBuffer = coreMessage.getBodyBuffer(); + legacyBuffer.resetWriterIndex(); + legacyBuffer.clear(); + + TextMessageUtil.writeBodyText(legacyBuffer, SimpleString.toSimpleString(newString)); + + ByteBuf newbuffer = Unpooled.buffer(150000); + + coreMessage.sendBuffer(newbuffer, 0); + newbuffer.readerIndex(0); + + CoreMessage newCoreMessage = new CoreMessage(); + newCoreMessage.receiveBuffer(newbuffer); + + + SimpleString newText = TextMessageUtil.readBodyText(newCoreMessage.getReadOnlyBodyBuffer()); + + Assert.assertEquals(newString, newText.toString()); + +// coreMessage.putStringProperty() + } + + @Test + public void testPassThroughMultipleThreads() throws Throwable { + CoreMessage coreMessage = new CoreMessage(); + coreMessage.receiveBuffer(BYTE_ENCODE); + + LinkedList<Throwable> errors = new LinkedList<>(); + + Thread[] threads = new Thread[50]; + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(() -> { + try { + for (int j = 0; j < 50; j++) { + Assert.assertEquals(ADDRESS, coreMessage.getAddressSimpleString()); + Assert.assertEquals(PROP1_VALUE.toString(), coreMessage.getStringProperty(PROP1_NAME)); + + ByteBuf destinedBuffer = Unpooled.buffer(BYTE_ENCODE.array().length); + coreMessage.sendBuffer(destinedBuffer, 0); + + byte[] destinedArray = destinedBuffer.array(); + byte[] sourceArray = BYTE_ENCODE.array(); + + Assert.assertArrayEquals(sourceArray, destinedArray); + + Assert.assertEquals(TEXT, TextMessageUtil.readBodyText(coreMessage.getReadOnlyBodyBuffer()).toString()); + } + } catch (Throwable e) { + e.printStackTrace(); + errors.add(e); + } + }); + } + + for (Thread t : threads) { + t.start(); + } + + for (Thread t : threads) { + t.join(); + } + + for (Throwable e: errors) { + throw e; + } + + } + + // This is to compare the original encoding with the current version + @Test + public void compareOriginal() throws Exception { + String generated = generate(TEXT); + + Assert.assertEquals(STRING_ENCODE, generated); + + for (int i = 0; i < generated.length(); i++) { + Assert.assertEquals("Chart at " + i + " was " + generated.charAt(i) + " instead of " + STRING_ENCODE.charAt(i), generated.charAt(i), STRING_ENCODE.charAt(i)); + } + } + + /** Use this method to update the encode for the known message */ + @Ignore + @Test + public void generate() throws Exception { + + printVariable(TEXT, generate(TEXT)); + printVariable(SMALLER_TEXT, generate(SMALLER_TEXT)); + printVariable(BIGGER_TEXT, generate(BIGGER_TEXT)); + + } + + private void printVariable(String body, String encode) { + System.out.println("// body = \"" + body + "\";"); + System.out.println("private final String STRING_ENCODE = \"" + encode + "\";"); + + } + + public String generate(String body) throws Exception { + + ClientMessageImpl message = new ClientMessageImpl(MESSAGE_TYPE, DURABLE, EXPIRATION, TIMESTAMP, PRIORITY, 10 * 1024); + TextMessageUtil.writeBodyText(message.getBodyBuffer(), SimpleString.toSimpleString(body)); + + message.setAddress(ADDRESS); + message.setUserID(uuid); + message.getProperties().putSimpleStringProperty(PROP1_NAME, PROP1_VALUE); + + + ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(10 * 1024); + message.sendBuffer(buffer.byteBuf(), 0); + + byte[] bytes = new byte[buffer.byteBuf().writerIndex()]; + buffer.byteBuf().readBytes(bytes); + + return encodeString(bytes); + + // replace the code + + + } + + private String encodeString(byte[] bytes) { + return Base64.encodeBytes(bytes, 0, bytes.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java index eb7cda1..2108be7 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java @@ -31,11 +31,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.journal.EncoderPersister; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; import org.apache.activemq.artemis.core.journal.LoaderCallback; +import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; @@ -366,10 +368,10 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } @Override - public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception { + public void appendAddRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception { JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet()); r.setUserRecordType(recordType); - r.setRecord(record); + r.setRecord(persister, record); r.setSync(sync); appendRecord(r); } @@ -377,12 +379,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { @Override public void appendAddRecord(long id, byte recordType, - EncodingSupport record, + Persister persister, + Object record, boolean sync, IOCompletion completionCallback) throws Exception { JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet()); r.setUserRecordType(recordType); - r.setRecord(record); + r.setRecord(persister, record); r.setSync(sync); r.setIoCompletion(completionCallback); appendRecord(r); @@ -398,10 +401,10 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } @Override - public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception { + public void appendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception { JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD, seq.incrementAndGet()); r.setUserRecordType(recordType); - r.setRecord(record); + r.setRecord(persister, record); r.setSync(sync); appendRecord(r); } @@ -409,12 +412,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { @Override public void appendUpdateRecord(long id, byte recordType, - EncodingSupport record, + Persister persister, + Object record, boolean sync, IOCompletion completionCallback) throws Exception { JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet()); r.setUserRecordType(recordType); - r.setRecord(record); + r.setRecord(persister, record); r.setSync(sync); r.setIoCompletion(completionCallback); appendRecord(r); @@ -448,10 +452,11 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { public void appendAddRecordTransactional(long txID, long id, byte recordType, - EncodingSupport record) throws Exception { + Persister persister, + Object record) throws Exception { JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX, seq.incrementAndGet()); r.setUserRecordType(recordType); - r.setRecord(record); + r.setRecord(persister, record); r.setTxId(txID); appendRecord(r); } @@ -469,10 +474,11 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { public void appendUpdateRecordTransactional(long txID, long id, byte recordType, - EncodingSupport record) throws Exception { + Persister persister, + Object record) throws Exception { JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX, seq.incrementAndGet()); r.setUserRecordType(recordType); - r.setRecord(record); + r.setRecord(persister, record); r.setTxId(txID); appendRecord(r); } @@ -488,7 +494,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { @Override public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception { JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet()); - r.setRecord(record); + r.setRecord(EncoderPersister.getInstance(), record); r.setTxId(txID); appendRecord(r); } @@ -685,10 +691,6 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } @Override - public void perfBlast(int pages) { - } - - @Override public void runDirectJournalBlast() throws Exception { } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java index 9691d3e..b094164 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java @@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.IOCompletion; +import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream; @@ -237,11 +238,11 @@ class JDBCJournalRecord { this.record = record; } - public void setRecord(EncodingSupport record) { - this.variableSize = record.getEncodeSize(); + public void setRecord(Persister persister, Object record) { + this.variableSize = persister.getEncodeSize(record); ActiveMQBuffer encodedBuffer = ActiveMQBuffers.fixedBuffer(variableSize); - record.encode(encodedBuffer); + persister.encode(encodedBuffer, record); this.record = new ActiveMQBufferInputStream(encodedBuffer); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSManagementHelper.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSManagementHelper.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSManagementHelper.java index 0e99106..4d0306b 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSManagementHelper.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSManagementHelper.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.api.jms.management; import javax.jms.JMSException; import javax.jms.Message; +import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.jms.client.ActiveMQMessage; @@ -27,7 +28,7 @@ import org.apache.activemq.artemis.jms.client.ActiveMQMessage; */ public class JMSManagementHelper { - private static org.apache.activemq.artemis.api.core.Message getCoreMessage(final Message jmsMessage) { + private static ClientMessage getCoreMessage(final Message jmsMessage) { if (jmsMessage instanceof ActiveMQMessage == false) { throw new IllegalArgumentException("Cannot send a foreign message as a management message " + jmsMessage.getClass().getName()); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java index 59f04e8..6da3912 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java @@ -26,7 +26,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientSession; -import org.apache.activemq.artemis.core.message.impl.MessageImpl; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesMessageReset; import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadBoolean; @@ -374,7 +374,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag if (bodyLength == 0) return null; byte[] dst = new byte[bodyLength]; - message.getBodyBuffer().getBytes(MessageImpl.BODY_OFFSET, dst); + message.getBodyBuffer().getBytes(CoreMessage.BODY_OFFSET, dst); return (T) dst; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java index 47dcfb2..80a07ac 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java @@ -43,7 +43,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; +import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.utils.UUID; @@ -293,7 +293,7 @@ public class ActiveMQMessage implements javax.jms.Message { @Override public String getJMSMessageID() { if (msgID == null) { - UUID uid = message.getUserID(); + UUID uid = (UUID)message.getUserID(); msgID = uid == null ? null : "ID:" + uid.toString(); } @@ -397,7 +397,7 @@ public class ActiveMQMessage implements javax.jms.Message { @Override public Destination getJMSDestination() throws JMSException { if (dest == null) { - SimpleString address = message.getAddress(); + SimpleString address = message.getAddressSimpleString(); String prefix = ""; if (message.containsProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE)) { RoutingType routingType = RoutingType.getType(message.getByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE)); @@ -756,7 +756,7 @@ public class ActiveMQMessage implements javax.jms.Message { @SuppressWarnings("unchecked") protected <T> T getBodyInternal(Class<T> c) throws MessageFormatException { - InputStream is = ((MessageInternal) message).getBodyInputStream(); + InputStream is = ((ClientMessageInternal) message).getBodyInputStream(); try { ObjectInputStream ois = new ObjectInputStream(is); return (T) ois.readObject(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java index 6cf20ff..ecb4ccb 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java @@ -19,7 +19,8 @@ package org.apache.activemq.artemis.jms.transaction; import javax.transaction.xa.Xid; import java.util.Map; -import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionDetail; import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage; @@ -36,8 +37,11 @@ public class JMSTransactionDetail extends TransactionDetail { } @Override - public String decodeMessageType(ServerMessage msg) { - int type = msg.getType(); + public String decodeMessageType(Message msg) { + if (!(msg instanceof ICoreMessage)) { + return "N/A"; + } + int type = ((ICoreMessage) msg).getType(); switch (type) { case ActiveMQMessage.TYPE: // 0 return "Default"; @@ -57,7 +61,7 @@ public class JMSTransactionDetail extends TransactionDetail { } @Override - public Map<String, Object> decodeMessageProperties(ServerMessage msg) { + public Map<String, Object> decodeMessageProperties(Message msg) { try { return ActiveMQMessage.coreMaptoJMSMap(msg.toMap()); } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java new file mode 100644 index 0000000..8fc2a5aa --- /dev/null +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.journal; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.core.persistence.Persister; + +/** This is a facade between the new Persister and the former EncodingSupport. + * Methods using the old interface will use this as a facade to provide the previous semantic. */ +public class EncoderPersister implements Persister<EncodingSupport> { + + private static final EncoderPersister theInstance = new EncoderPersister(); + + private EncoderPersister() { + } + + public static EncoderPersister getInstance() { + return theInstance; + } + + @Override + public int getEncodeSize(EncodingSupport record) { + return record.getEncodeSize(); + } + + @Override + public void encode(ActiveMQBuffer buffer, EncodingSupport record) { + record.encode(buffer); + } + + @Override + public EncodingSupport decode(ActiveMQBuffer buffer, EncodingSupport record) { + record.decode(buffer); + return record; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java index fbd4182..ca194b8 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.impl.JournalFile; +import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.server.ActiveMQComponent; /** @@ -60,23 +61,49 @@ public interface Journal extends ActiveMQComponent { void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception; - void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception; + default void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception { + appendAddRecord(id, recordType, EncoderPersister.getInstance(), record, sync); + } + + void appendAddRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception; void appendAddRecord(long id, byte recordType, - EncodingSupport record, + Persister persister, + Object record, boolean sync, IOCompletion completionCallback) throws Exception; + default void appendAddRecord(long id, + byte recordType, + EncodingSupport record, + boolean sync, + IOCompletion completionCallback) throws Exception { + appendAddRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback); + } + void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception; - void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception; + default void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception { + appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync); + } - void appendUpdateRecord(long id, + void appendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception; + + default void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync, - IOCompletion completionCallback) throws Exception; + IOCompletion completionCallback) throws Exception { + appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback); + } + + void appendUpdateRecord(final long id, + final byte recordType, + final Persister persister, + final Object record, + final boolean sync, + final IOCompletion callback) throws Exception; void appendDeleteRecord(long id, boolean sync) throws Exception; @@ -86,11 +113,23 @@ public interface Journal extends ActiveMQComponent { void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception; - void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception; + default void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception { + appendAddRecordTransactional(txID, id, recordType, EncoderPersister.getInstance(), record); + } + + void appendAddRecordTransactional(final long txID, + final long id, + final byte recordType, + final Persister persister, + final Object record) throws Exception; void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception; - void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception; + default void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception { + appendUpdateRecordTransactional(txID, id, recordType, EncoderPersister.getInstance(), record); + } + + void appendUpdateRecordTransactional(long txID, long id, byte recordType, Persister persister, Object record) throws Exception; void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception; @@ -165,8 +204,6 @@ public interface Journal extends ActiveMQComponent { int getUserVersion(); - void perfBlast(int pages); - void runDirectJournalBlast() throws Exception; /** http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java index 8bbecd2..943077c 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java @@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.journal.EncoderPersister; import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord; @@ -127,7 +128,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback } } - JournalInternalRecord controlRecord = new JournalAddRecord(true, 1, (byte) 0, new ByteArrayEncoding(filesToRename.toByteBuffer().array())); + JournalInternalRecord controlRecord = new JournalAddRecord(true, 1, (byte) 0, EncoderPersister.getInstance(), new ByteArrayEncoding(filesToRename.toByteBuffer().array())); ActiveMQBuffer renameBuffer = ActiveMQBuffers.dynamicBuffer(filesToRename.writerIndex()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java index 0b702a5..8e5ca2c 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java @@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; import org.apache.activemq.artemis.core.journal.LoaderCallback; +import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; @@ -90,10 +91,11 @@ public final class FileWrapperJournal extends JournalBase { @Override public void appendAddRecord(long id, byte recordType, - EncodingSupport record, + Persister persister, + Object record, boolean sync, IOCompletion callback) throws Exception { - JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record); + JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, persister, record); writeRecord(addRecord, sync, callback); } @@ -144,19 +146,21 @@ public final class FileWrapperJournal extends JournalBase { public void appendAddRecordTransactional(long txID, long id, byte recordType, - EncodingSupport record) throws Exception { + Persister persister, + Object record) throws Exception { count(txID); - JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record); + JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, persister, record); writeRecord(addRecord, false, null); } @Override public void appendUpdateRecord(long id, byte recordType, - EncodingSupport record, + Persister persister, + Object record, boolean sync, IOCompletion callback) throws Exception { - JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record); + JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, persister, record); writeRecord(updateRecord, sync, callback); } @@ -164,9 +168,10 @@ public final class FileWrapperJournal extends JournalBase { public void appendUpdateRecordTransactional(long txID, long id, byte recordType, - EncodingSupport record) throws Exception { + Persister persister, + Object record) throws Exception { count(txID); - JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, record); + JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, persister, record); writeRecord(updateRecordTX, false, null); } @@ -261,11 +266,6 @@ public final class FileWrapperJournal extends JournalBase { } @Override - public void perfBlast(int pages) { - throw new UnsupportedOperationException(); - } - - @Override public void runDirectJournalBlast() throws Exception { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java index e2ca84d..e6bd99e 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java @@ -21,6 +21,7 @@ import org.apache.activemq.artemis.core.io.DummyCallback; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.Journal; +import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding; abstract class JournalBase implements Journal { @@ -37,68 +38,15 @@ abstract class JournalBase implements Journal { } @Override - public abstract void appendAddRecord(final long id, - final byte recordType, - final EncodingSupport record, - final boolean sync, - final IOCompletion callback) throws Exception; - - @Override - public abstract void appendAddRecordTransactional(final long txID, - final long id, - final byte recordType, - final EncodingSupport record) throws Exception; - - @Override - public abstract void appendCommitRecord(final long txID, - final boolean sync, - final IOCompletion callback, - boolean lineUpContext) throws Exception; - - @Override - public abstract void appendDeleteRecord(final long id, - final boolean sync, - final IOCompletion callback) throws Exception; - - @Override - public abstract void appendDeleteRecordTransactional(final long txID, - final long id, - final EncodingSupport record) throws Exception; - - @Override - public abstract void appendPrepareRecord(final long txID, - final EncodingSupport transactionData, - final boolean sync, - final IOCompletion callback) throws Exception; - - @Override - public abstract void appendUpdateRecord(final long id, - final byte recordType, - final EncodingSupport record, - final boolean sync, - final IOCompletion callback) throws Exception; - - @Override - public abstract void appendUpdateRecordTransactional(final long txID, - final long id, - final byte recordType, - final EncodingSupport record) throws Exception; - - @Override - public abstract void appendRollbackRecord(final long txID, - final boolean sync, - final IOCompletion callback) throws Exception; - - @Override public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception { appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync); } @Override - public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception { + public void appendAddRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception { SyncIOCompletion callback = getSyncCallback(sync); - appendAddRecord(id, recordType, record, sync, callback); + appendAddRecord(id, recordType, persister, record, sync, callback); if (callback != null) { callback.waitCompletion(); @@ -176,11 +124,12 @@ abstract class JournalBase implements Journal { @Override public void appendUpdateRecord(final long id, final byte recordType, - final EncodingSupport record, + final Persister persister, + final Object record, final boolean sync) throws Exception { SyncIOCompletion callback = getSyncCallback(sync); - appendUpdateRecord(id, recordType, record, sync, callback); + appendUpdateRecord(id, recordType, persister, record, sync, callback); if (callback != null) { callback.waitCompletion(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java index b95d641..c62b27b 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java @@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.journal.EncoderPersister; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord; @@ -252,7 +253,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ @Override public void onReadAddRecord(final RecordInfo info) throws Exception { if (lookupRecord(info.id)) { - JournalInternalRecord addRecord = new JournalAddRecord(true, info.id, info.getUserRecordType(), new ByteArrayEncoding(info.data)); + JournalInternalRecord addRecord = new JournalAddRecord(true, info.id, info.getUserRecordType(), EncoderPersister.getInstance(), new ByteArrayEncoding(info.data)); addRecord.setCompactCount((short) (info.compactCount + 1)); checkSize(addRecord.getEncodeSize(), info.compactCount); @@ -268,7 +269,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ if (pendingTransactions.get(transactionID) != null || lookupRecord(info.id)) { JournalTransaction newTransaction = getNewJournalTransaction(transactionID); - JournalInternalRecord record = new JournalAddRecordTX(true, transactionID, info.id, info.getUserRecordType(), new ByteArrayEncoding(info.data)); + JournalInternalRecord record = new JournalAddRecordTX(true, transactionID, info.id, info.getUserRecordType(), EncoderPersister.getInstance(),new ByteArrayEncoding(info.data)); record.setCompactCount((short) (info.compactCount + 1)); @@ -374,7 +375,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ @Override public void onReadUpdateRecord(final RecordInfo info) throws Exception { if (lookupRecord(info.id)) { - JournalInternalRecord updateRecord = new JournalAddRecord(false, info.id, info.userRecordType, new ByteArrayEncoding(info.data)); + JournalInternalRecord updateRecord = new JournalAddRecord(false, info.id, info.userRecordType, EncoderPersister.getInstance(), new ByteArrayEncoding(info.data)); updateRecord.setCompactCount((short) (info.compactCount + 1)); @@ -397,7 +398,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ if (pendingTransactions.get(transactionID) != null || lookupRecord(info.id)) { JournalTransaction newTransaction = getNewJournalTransaction(transactionID); - JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, transactionID, info.id, info.userRecordType, new ByteArrayEncoding(info.data)); + JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, transactionID, info.id, info.userRecordType, EncoderPersister.getInstance(), new ByteArrayEncoding(info.data)); updateRecordTX.setCompactCount((short) (info.compactCount + 1)); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index db615f8..24bb916 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -57,11 +57,11 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; import org.apache.activemq.artemis.core.journal.LoaderCallback; +import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.TestableJournal; import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; -import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecordTX; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalCompleteRecordTX; @@ -713,7 +713,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal @Override public void appendAddRecord(final long id, final byte recordType, - final EncodingSupport record, + final Persister persister, + final Object record, final boolean sync, final IOCompletion callback) throws Exception { checkJournalIsLoaded(); @@ -727,7 +728,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal public void run() { journalLock.readLock().lock(); try { - JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record); + JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, persister, record); JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback); records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize())); @@ -762,7 +763,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal @Override public void appendUpdateRecord(final long id, final byte recordType, - final EncodingSupport record, + final Persister persister, + final Object record, final boolean sync, final IOCompletion callback) throws Exception { checkJournalIsLoaded(); @@ -777,7 +779,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal journalLock.readLock().lock(); try { JournalRecord jrnRecord = records.get(id); - JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record); + JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, persister, record); JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback); if (logger.isTraceEnabled()) { @@ -873,7 +875,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, - final EncodingSupport record) throws Exception { + final Persister persister, + final Object record) throws Exception { checkJournalIsLoaded(); final JournalTransaction tx = getTransactionInfo(txID); @@ -885,7 +888,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal public void run() { journalLock.readLock().lock(); try { - JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record); + JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, persister, record); JournalFile usedFile = appendRecord(addRecord, false, false, tx, null); if (logger.isTraceEnabled()) { @@ -952,7 +955,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal public void appendUpdateRecordTransactional(final long txID, final long id, final byte recordType, - final EncodingSupport record) throws Exception { + final Persister persister, + final Object record) throws Exception { checkJournalIsLoaded(); final JournalTransaction tx = getTransactionInfo(txID); @@ -965,7 +969,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal journalLock.readLock().lock(); try { - JournalInternalRecord updateRecordTX = new JournalAddRecordTX( false, txID, id, recordType, record ); + JournalInternalRecord updateRecordTX = new JournalAddRecordTX( false, txID, id, recordType, persister, record ); JournalFile usedFile = appendRecord( updateRecordTX, false, false, tx, null ); if ( logger.isTraceEnabled() ) { @@ -2165,45 +2169,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } } - @Override - public void perfBlast(final int pages) { - - checkJournalIsLoaded(); - - final ByteArrayEncoding byteEncoder = new ByteArrayEncoding(new byte[128 * 1024]); - - final JournalInternalRecord blastRecord = new JournalInternalRecord() { - - @Override - public int getEncodeSize() { - return byteEncoder.getEncodeSize(); - } - - @Override - public void encode(final ActiveMQBuffer buffer) { - byteEncoder.encode(buffer); - } - }; - - appendExecutor.execute(new Runnable() { - @Override - public void run() { - journalLock.readLock().lock(); - try { - - for (int i = 0; i < pages; i++) { - appendRecord(blastRecord, false, false, null, null); - } - - } catch (Exception e) { - ActiveMQJournalLogger.LOGGER.failedToPerfBlast(e); - } finally { - journalLock.readLock().unlock(); - } - } - }); - } - // ActiveMQComponent implementation // --------------------------------------------------- @@ -2921,5 +2886,4 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal public int getCompactCount() { return compactCount; } - } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java index c6a5d4a..6e5b651 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java @@ -17,14 +17,16 @@ package org.apache.activemq.artemis.core.journal.impl.dataformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; public class JournalAddRecord extends JournalInternalRecord { protected final long id; - protected final EncodingSupport record; + protected final Persister persister; + + protected final Object record; protected final byte recordType; @@ -35,7 +37,7 @@ public class JournalAddRecord extends JournalInternalRecord { * @param recordType * @param record */ - public JournalAddRecord(final boolean add, final long id, final byte recordType, final EncodingSupport record) { + public JournalAddRecord(final boolean add, final long id, final byte recordType, final Persister persister, Object record) { this.id = id; this.record = record; @@ -43,6 +45,8 @@ public class JournalAddRecord extends JournalInternalRecord { this.recordType = recordType; this.add = add; + + this.persister = persister; } @Override @@ -59,17 +63,19 @@ public class JournalAddRecord extends JournalInternalRecord { buffer.writeLong(id); - buffer.writeInt(record.getEncodeSize()); + int recordEncodeSize = persister.getEncodeSize(record); + + buffer.writeInt(persister.getEncodeSize(record)); buffer.writeByte(recordType); - record.encode(buffer); + persister.encode(buffer, record); - buffer.writeInt(getEncodeSize()); + buffer.writeInt(recordEncodeSize + JournalImpl.SIZE_ADD_RECORD + 1); } @Override public int getEncodeSize() { - return JournalImpl.SIZE_ADD_RECORD + record.getEncodeSize() + 1; + return JournalImpl.SIZE_ADD_RECORD + persister.getEncodeSize(record) + 1; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecordTX.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecordTX.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecordTX.java index 6cec122..483418f 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecordTX.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecordTX.java @@ -17,7 +17,7 @@ package org.apache.activemq.artemis.core.journal.impl.dataformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; public class JournalAddRecordTX extends JournalInternalRecord { @@ -26,7 +26,9 @@ public class JournalAddRecordTX extends JournalInternalRecord { private final long id; - private final EncodingSupport record; + protected final Persister persister; + + protected final Object record; private final byte recordType; @@ -41,12 +43,15 @@ public class JournalAddRecordTX extends JournalInternalRecord { final long txID, final long id, final byte recordType, - final EncodingSupport record) { + final Persister persister, + Object record) { this.txID = txID; this.id = id; + this.persister = persister; + this.record = record; this.recordType = recordType; @@ -70,17 +75,17 @@ public class JournalAddRecordTX extends JournalInternalRecord { buffer.writeLong(id); - buffer.writeInt(record.getEncodeSize()); + buffer.writeInt(persister.getEncodeSize(record)); buffer.writeByte(recordType); - record.encode(buffer); + persister.encode(buffer, record); buffer.writeInt(getEncodeSize()); } @Override public int getEncodeSize() { - return JournalImpl.SIZE_ADD_RECORD_TX + record.getEncodeSize() + 1; + return JournalImpl.SIZE_ADD_RECORD_TX + persister.getEncodeSize(record) + 1; } }
