http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 1b7ed43..b5d2c86 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -23,6 +23,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.postoffice.RoutingStatus; @@ -34,7 +35,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.QueueQueryResult; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; @@ -234,7 +234,8 @@ public class AMQSession implements SessionCallback { ServerConsumer consumer, int deliveryCount) { AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData(); - return theConsumer.handleDeliver(reference, message, deliveryCount); + // TODO: use encoders and proper conversions here + return theConsumer.handleDeliver(reference, message.toCore(), deliveryCount); } @Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index c64c1ea..d0dff4d 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -30,7 +30,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; -import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; @@ -598,7 +598,7 @@ public final class StompConnection implements RemotingConnection { } } - protected void sendServerMessage(Message message, String txID) throws ActiveMQStompException { + protected void sendServerMessage(ICoreMessage message, String txID) throws ActiveMQStompException { StompSession stompSession = getSession(txID); if (stompSession.isNoLocal()) { @@ -726,7 +726,7 @@ public final class StompConnection implements RemotingConnection { return SERVER_NAME; } - public StompFrame createStompMessage(Message serverMessage, + public StompFrame createStompMessage(ICoreMessage serverMessage, StompSubscription subscription, int deliveryCount) throws Exception { return frameHandler.createMessageFrame(serverMessage, subscription, deliveryCount); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java index 2be0be4..39d2fe9 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java @@ -41,7 +41,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; -import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; @@ -109,13 +108,6 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St } @Override - public MessageConverter getConverter() { - return null; - } - - // ProtocolManager implementation -------------------------------- - - @Override public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final Connection connection) { StompConnection conn = new StompConnection(acceptorUsed, connection, this, server.getScheduledPool(), server.getExecutorFactory()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java index d2d42b7..ba706e5 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java @@ -26,6 +26,7 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.zip.Inflater; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.RoutingType; @@ -131,12 +132,11 @@ public class StompSession implements SessionCallback { //TODO-now: fix encoders LargeServerMessageImpl largeMessage = null; - Message newServerMessage = serverMessage; + ICoreMessage newServerMessage = serverMessage.toCore(); try { StompSubscription subscription = subscriptions.get(consumer.getID()); StompFrame frame = null; if (serverMessage.isLargeMessage()) { - newServerMessage = serverMessage.copy(); largeMessage = (LargeServerMessageImpl) serverMessage; LargeBodyEncoder encoder = largeMessage.getBodyEncoder(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java index 8d13613..1e40d42 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java @@ -20,6 +20,7 @@ import java.nio.charset.StandardCharsets; import java.util.concurrent.ScheduledExecutorService; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; @@ -287,7 +288,7 @@ public abstract class VersionedStompFrameHandler { return response; } - public StompFrame createMessageFrame(Message serverMessage, + public StompFrame createMessageFrame(ICoreMessage serverMessage, StompSubscription subscription, int deliveryCount) throws Exception { StompFrame frame = createStompFrame(Stomp.Responses.MESSAGE); @@ -299,7 +300,7 @@ public abstract class VersionedStompFrameHandler { // TODO-now fix encoders ActiveMQBuffer buffer = serverMessage.getReadOnlyBodyBuffer(); - int bodyPos = ((CoreMessage)serverMessage).getEndOfBodyPosition() == -1 ? buffer.writerIndex() : ((CoreMessage)serverMessage).getEndOfBodyPosition(); + int bodyPos = (serverMessage).getEndOfBodyPosition() == -1 ? buffer.writerIndex() : (serverMessage).getEndOfBodyPosition(); buffer.readerIndex(CoreMessage.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java index b14605d..58d18ef 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java @@ -18,7 +18,7 @@ package org.apache.activemq.artemis.core.protocol.stomp.v12; import java.util.concurrent.ScheduledExecutorService; -import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException; import org.apache.activemq.artemis.core.protocol.stomp.Stomp; import org.apache.activemq.artemis.core.protocol.stomp.StompConnection; @@ -48,7 +48,7 @@ public class StompFrameHandlerV12 extends StompFrameHandlerV11 { } @Override - public StompFrame createMessageFrame(Message serverMessage, + public StompFrame createMessageFrame(ICoreMessage serverMessage, StompSubscription subscription, int deliveryCount) throws Exception { StompFrame frame = super.createMessageFrame(serverMessage, subscription, deliveryCount); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java index 9f36b7f..0ecbae1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.MessageReference; @@ -48,8 +49,10 @@ public final class OpenTypeSupport { public static CompositeData convert(MessageReference ref) throws OpenDataException { CompositeType ct; + ICoreMessage message = ref.getMessage().toCore(); + Map<String, Object> fields; - byte type = ref.getMessage().getType(); + byte type = message.getType(); switch(type) { case Message.TEXT_TYPE: @@ -128,8 +131,7 @@ public final class OpenTypeSupport { public Map<String, Object> getFields(MessageReference ref) throws OpenDataException { Map<String, Object> rc = new HashMap<>(); - // TODO-now: fix this - Message m = ref.getMessage(); + ICoreMessage m = ref.getMessage().toCore(); rc.put(CompositeDataConstants.MESSAGE_ID, "" + m.getMessageID()); if (m.getUserID() != null) { rc.put(CompositeDataConstants.USER_ID, "ID:" + m.getUserID().toString()); @@ -270,7 +272,7 @@ public final class OpenTypeSupport { @Override public Map<String, Object> getFields(MessageReference ref) throws OpenDataException { Map<String, Object> rc = super.getFields(ref); - Message m = ref.getMessage(); + ICoreMessage m = ref.getMessage().toCore(); ActiveMQBuffer bodyCopy = m.getReadOnlyBodyBuffer(); byte[] bytes = new byte[bodyCopy.readableBytes()]; bodyCopy.readBytes(bytes); @@ -291,7 +293,7 @@ public final class OpenTypeSupport { @Override public Map<String, Object> getFields(MessageReference ref) throws OpenDataException { Map<String, Object> rc = super.getFields(ref); - Message m = ref.getMessage(); + ICoreMessage m = ref.getMessage().toCore(); SimpleString text = m.getReadOnlyBodyBuffer().readNullableSimpleString(); rc.put(CompositeDataConstants.TEXT_BODY, text != null ? text.toString() : ""); return rc; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java index aabec54..271b85c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; @@ -255,7 +256,7 @@ public final class Page implements Comparable<Page> { if (messages != null) { for (PagedMessage msg : messages) { - if (msg.getMessage().isLargeMessage()) { + if (msg.getMessage() instanceof ICoreMessage && ((ICoreMessage)msg.getMessage()).isLargeMessage()) { LargeServerMessage lmsg = (LargeServerMessage) msg.getMessage(); // Remember, cannot call delete directly here http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java index d50dd2e..7d43a2e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.paging.impl; import java.util.Arrays; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -136,8 +137,8 @@ public class PagedMessageImpl implements PagedMessage { } } - private boolean isLargeMessage() { - return message.isLargeMessage(); + public boolean isLargeMessage() { + return message instanceof ICoreMessage && ((ICoreMessage)message).isLargeMessage(); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 2295987..fc0885f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.exception.ActiveMQXAException; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -85,7 +86,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.QueueQueryResult; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.jboss.logging.Logger; @@ -491,7 +491,6 @@ public class ServerSessionPacketHandler implements ChannelHandler { case SESS_SEND: { SessionSendMessage message = (SessionSendMessage) packet; requiresResponse = message.isRequiresResponse(); - message.getMessage().setProtocol(manager); session.send(message.getMessage(), direct); if (requiresResponse) { response = new NullResponseMessage(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java index ffaf2cb..cc81fbe 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java @@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; @@ -53,9 +54,7 @@ import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.impl.netty.ActiveMQFrameDecoder2; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; -import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -111,16 +110,6 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> { return false; } - /** - * no need to implement this now - * - * @return - */ - @Override - public MessageConverter getConverter() { - return null; - } - @Override public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final Connection connection) { final Configuration config = server.getConfiguration(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java index 3a09e91..542d726 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java @@ -92,10 +92,9 @@ public final class CoreSessionCallback implements SessionCallback { } @Override - public int sendMessage(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) { + public int sendMessage(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) { - // TODO-now: fix this - Packet packet = new SessionReceiveMessage(consumer.getID(), message, deliveryCount); + Packet packet = new SessionReceiveMessage(consumer.getID(), message.toCore(), deliveryCount); int size = 0; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java index aa58a7d..6fcc802 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java @@ -17,11 +17,11 @@ package org.apache.activemq.artemis.core.server; import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.replication.ReplicatedLargeMessage; -public interface LargeServerMessage extends ReplicatedLargeMessage, Message { +public interface LargeServerMessage extends ReplicatedLargeMessage, ICoreMessage { @Override void addBytes(byte[] bytes) throws Exception; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java index ce9c489..ab97b56 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server; import java.util.List; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.core.transaction.Transaction; /** @@ -94,7 +95,7 @@ public interface ServerConsumer extends Consumer { void individualCancel(final long messageID, boolean failed) throws Exception; - void forceDelivery(long sequence); + void forceDelivery(long sequence) throws ActiveMQException; void setTransferring(boolean transferring); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index a130437..e3c4744 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -507,14 +507,19 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { * there are no other messages to be delivered. */ @Override - public void forceDelivery(final long sequence) { + public void forceDelivery(final long sequence) throws ActiveMQException { forceDelivery(sequence, () -> { Message forcedDeliveryMessage = new CoreMessage(storageManager.generateID(), 50); forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence); forcedDeliveryMessage.setAddress(messageQueue.getName()); - callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0); + try { + callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + }); } @@ -1015,7 +1020,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { * @param ref * @param message */ - private void deliverStandardMessage(final MessageReference ref, final Message message) { + private void deliverStandardMessage(final MessageReference ref, final Message message) throws ActiveMQException { int packetSize = callback.sendMessage(ref, message, ServerConsumerImpl.this, ref.getDeliveryCount()); if (availableCredits != null) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java index 84ab636..29a2e47 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java @@ -21,7 +21,9 @@ import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; @@ -42,8 +44,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueFactory; -import org.apache.activemq.artemis.api.core.RoutingType; - import org.apache.activemq.artemis.core.server.cluster.Bridge; import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; @@ -129,5 +129,5 @@ public interface ManagementService extends NotificationService, ActiveMQComponen Object[] getResources(Class<?> resourceType); - Message handleMessage(Message message) throws Exception; + ICoreMessage handleMessage(Message message) throws Exception; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java index 5b2bf28..cda0a8a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java @@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.JsonUtil; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; @@ -365,10 +366,10 @@ public class ManagementServiceImpl implements ManagementService { } @Override - public Message handleMessage(Message message) throws Exception { + public ICoreMessage handleMessage(Message message) throws Exception { message = message.toCore(); // a reply message is sent with the result stored in the message body. - Message reply = new CoreMessage(storageManager.generateID(), 512); + CoreMessage reply = new CoreMessage(storageManager.generateID(), 512); String resourceName = message.getStringProperty(ManagementHelper.HDR_RESOURCE_NAME); if (logger.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java index c885341..95036da 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java @@ -19,8 +19,8 @@ package org.apache.activemq.artemis.core.transaction.impl; import javax.transaction.xa.Xid; import java.util.Map; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; - import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionDetail; @@ -32,7 +32,10 @@ public class CoreTransactionDetail extends TransactionDetail { @Override public String decodeMessageType(Message msg) { - int type = msg.getType(); + if (!(msg instanceof ICoreMessage)) { + return "N/A"; + } + int type = ((ICoreMessage)msg).getType(); switch (type) { case Message.DEFAULT_TYPE: // 0 return "Default"; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java index 3a5e2bf..a440e31 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java @@ -16,11 +16,12 @@ */ package org.apache.activemq.artemis.spi.core.protocol; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; -public interface MessageConverter { +public interface MessageConverter<ProtocolMessage extends Message> { - Message inbound(Object messageInbound) throws Exception; + ICoreMessage toCore(ProtocolMessage pureMessage) throws Exception; - Object outbound(Message messageOutbound, int deliveryCount) throws Exception; + ProtocolMessage fromCore(ICoreMessage coreMessage) throws Exception; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java index c2b7334..e29d74d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java @@ -22,9 +22,9 @@ import java.util.Map; import io.netty.channel.ChannelPipeline; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.BaseInterceptor; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Connection; @@ -53,14 +53,6 @@ public interface ProtocolManager<P extends BaseInterceptor> { boolean isProtocol(byte[] array); /** - * Gets the Message Converter towards ActiveMQ Artemis. - * Notice this being null means no need to convert - * - * @return - */ - MessageConverter getConverter(); - - /** * If this protocols accepts connectoins without an initial handshake. * If true this protocol will be the failback case no other connections are made. * New designed protocols should always require a handshake. This is only useful for legacy protocols. http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java index 92204ce..2f18c21 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java @@ -23,7 +23,9 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.management.ManagementHelper; @@ -44,8 +46,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueFactory; -import org.apache.activemq.artemis.api.core.RoutingType; - import org.apache.activemq.artemis.core.server.cluster.Bridge; import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; @@ -330,7 +330,7 @@ public class ClusteredResetMockTest extends ActiveMQTestBase { } @Override - public Message handleMessage(Message message) throws Exception { + public ICoreMessage handleMessage(Message message) throws Exception { return null; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 5b44572..522b7d0 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -32,15 +32,13 @@ import java.util.concurrent.atomic.AtomicInteger; import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RefCountMessage; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.api.core.encode.BodyType; import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.message.LargeBodyEncoder; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.server.Consumer; @@ -312,12 +310,7 @@ public class ScheduledDeliveryHandlerTest extends Assert { final long id; @Override - public Message toCore() { - return this; - } - - @Override - public ActiveMQBuffer getReadOnlyBodyBuffer() { + public CoreMessage toCore() { return null; } @@ -389,10 +382,6 @@ public class ScheduledDeliveryHandlerTest extends Assert { public void messageChanged() { } - @Override - public LargeBodyEncoder getBodyEncoder() throws ActiveMQException { - return null; - } @Override public UUID getUserID() { @@ -418,32 +407,6 @@ public class ScheduledDeliveryHandlerTest extends Assert { public ByteBuf getBuffer() { return null; } - - @Override - public Object getProtocol() { - return null; - } - - @Override - public Message setProtocol(Object protocol) { - return null; - } - - @Override - public Object getBody() { - return null; - } - - @Override - public BodyType getBodyType() { - return null; - } - - @Override - public Message setBody(BodyType type, Object body) { - return null; - } - @Override public Message setAddress(String address) { return null; @@ -455,11 +418,6 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override - public byte getType() { - return 0; - } - - @Override public boolean isDurable() { return false; } @@ -515,11 +473,6 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override - public ActiveMQBuffer getBodyBuffer() { - return null; - } - - @Override public Message putBooleanProperty(SimpleString key, boolean value) { return null; } @@ -785,11 +738,6 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override - public Message setType(byte type) { - return null; - } - - @Override public void receiveBuffer(ByteBuf buffer) { } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index aa64d9f..0bb177d 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -65,6 +65,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; @@ -2079,7 +2080,7 @@ public abstract class ActiveMQTestBase extends Assert { } protected Message generateMessage(final long id) { - Message message = new CoreMessage(id, 1000); + ICoreMessage message = new CoreMessage(id, 1000); message.setMessageID(id); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java index 40f2ebd..16154c1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java @@ -25,6 +25,7 @@ import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RefCountMessage; import org.apache.activemq.artemis.api.core.SimpleString; @@ -35,9 +36,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.MessageHandler; import org.apache.activemq.artemis.api.core.client.ServerLocator; -import org.apache.activemq.artemis.api.core.encode.BodyType; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; -import org.apache.activemq.artemis.core.message.LargeBodyEncoder; import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQConsumerContext; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -339,6 +338,8 @@ public class AcknowledgeTest extends ActiveMQTestBase { class FakeMessageWithID extends RefCountMessage { + final long id; + @Override public int getPersistSize() { return 0; @@ -354,11 +355,6 @@ public class AcknowledgeTest extends ActiveMQTestBase { } @Override - public Message setProtocol(Object protocol) { - return this; - } - - @Override public void reloadPersistence(ActiveMQBuffer record) { } @@ -369,8 +365,8 @@ public class AcknowledgeTest extends ActiveMQTestBase { } @Override - public Message toCore() { - return this; + public ICoreMessage toCore() { + return null; } @Override @@ -382,12 +378,6 @@ public class AcknowledgeTest extends ActiveMQTestBase { public void sendBuffer(ByteBuf buffer, int count) { } - - @Override - public LargeBodyEncoder getBodyEncoder() throws ActiveMQException { - return null; - } - @Override public Message setUserID(Object userID) { return null; @@ -404,18 +394,6 @@ public class AcknowledgeTest extends ActiveMQTestBase { } @Override - public ActiveMQBuffer getReadOnlyBodyBuffer() { - return null; - } - - final long id; - - @Override - public Message setType(byte type) { - return null; - } - - @Override public Message copy() { return null; } @@ -495,26 +473,6 @@ public class AcknowledgeTest extends ActiveMQTestBase { } @Override - public Object getProtocol() { - return null; - } - - @Override - public Object getBody() { - return null; - } - - @Override - public BodyType getBodyType() { - return null; - } - - @Override - public Message setBody(BodyType type, Object body) { - return null; - } - - @Override public Message setAddress(String address) { return null; } @@ -525,11 +483,6 @@ public class AcknowledgeTest extends ActiveMQTestBase { } @Override - public byte getType() { - return 0; - } - - @Override public boolean isDurable() { return false; } @@ -585,11 +538,6 @@ public class AcknowledgeTest extends ActiveMQTestBase { } @Override - public ActiveMQBuffer getBodyBuffer() { - return null; - } - - @Override public Message putBooleanProperty(SimpleString key, boolean value) { return null; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java index e2cf2a0..03e9ec3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java @@ -196,7 +196,7 @@ public class ConsumerTest extends ActiveMQTestBase { return; } - internalSend(true); + internalSend(true, true); } @Test @@ -207,21 +207,38 @@ public class ConsumerTest extends ActiveMQTestBase { return; } - internalSend(false); + internalSend(false, true); } - public void internalSend(boolean amqp) throws Throwable { + @Test + public void testSendAMQPReceiveCore() throws Throwable { + + if (!isNetty()) { + // no need to run the test, there's no AMQP support + return; + } + + internalSend(true, false); + } - ConnectionFactory factory; + @Test + public void testSendCoreReceiveAMQP() throws Throwable { - if (amqp) { - factory = new JmsConnectionFactory("amqp://localhost:61616"); - } else { - factory = new ActiveMQConnectionFactory(); + if (!isNetty()) { + // no need to run the test, there's no AMQP support + return; } + internalSend(false, true); + } + + public void internalSend(boolean amqpSender, boolean amqpConsumer) throws Throwable { + + ConnectionFactory factoryAMQP = new JmsConnectionFactory("amqp://localhost:61616"); + ConnectionFactory factoryCore = new ActiveMQConnectionFactory(); + - Connection connection = factory.createConnection(); + Connection connection = (amqpSender ? factoryAMQP : factoryCore).createConnection(); try { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -232,7 +249,9 @@ public class ConsumerTest extends ActiveMQTestBase { long time = System.currentTimeMillis(); int NUMBER_OF_MESSAGES = 100; for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { - producer.send(session.createTextMessage("hello " + i)); + TextMessage msg = session.createTextMessage("hello " + i); + msg.setIntProperty("mycount", i); + producer.send(msg); } long end = System.currentTimeMillis(); @@ -245,8 +264,9 @@ public class ConsumerTest extends ActiveMQTestBase { server.start(); } - connection = factory.createConnection(); + connection = (amqpConsumer ? factoryAMQP : factoryCore).createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + queue = session.createQueue(QUEUE.toString()); connection.start(); @@ -255,6 +275,7 @@ public class ConsumerTest extends ActiveMQTestBase { for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { TextMessage message = (TextMessage) consumer.receive(1000); Assert.assertNotNull(message); + Assert.assertEquals(i, message.getIntProperty("mycount")); Assert.assertEquals("hello " + i, message.getText()); } } finally { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java index 5e822eb..025d00a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java @@ -40,7 +40,6 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.StoreConfiguration; - import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -350,7 +349,7 @@ public class LargeMessageTest extends LargeMessageTestBase { ClientProducer producer = session.createProducer(ADDRESS); - Message clientFile = session.createMessage(true); + ClientMessage clientFile = session.createMessage(true); for (int i = 0; i < messageSize; i++) { clientFile.getBodyBuffer().writeByte(getSamplebyte(i)); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/ClientExitTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/ClientExitTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/ClientExitTest.java index 70c5b22..23fa0a6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/ClientExitTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/ClientExitTest.java @@ -16,9 +16,9 @@ */ package org.apache.activemq.artemis.tests.integration.clientcrash; -import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; @@ -63,7 +63,7 @@ public class ClientExitTest extends ClientTestBase { // read the message from the queue - Message message = consumer.receive(15000); + ClientMessage message = consumer.receive(15000); assertNotNull(message); assertEquals(ClientExitTest.MESSAGE_TEXT, message.getBodyBuffer().readString()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MessageJournalTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MessageJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MessageJournalTest.java index 1897bdd..85ed04f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MessageJournalTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MessageJournalTest.java @@ -28,7 +28,6 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageM import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; -import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager; import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.qpid.proton.message.Message; @@ -53,8 +52,6 @@ public class MessageJournalTest extends ActiveMQTestBase { message.getBodyBuffer().writeByte((byte)'Z'); - message.setProtocol(factory.createProtocolManager(server, null, null, null)); - server.getStorageManager().storeMessage(message); server.getStorageManager().stop(); @@ -95,7 +92,7 @@ public class MessageJournalTest extends ActiveMQTestBase { Message protonJMessage = Message.Factory.create(); - AMQPMessage message = new AMQPMessage(protonJMessage, (ProtonProtocolManager)factory.createProtocolManager(server, null, null, null)); + AMQPMessage message = new AMQPMessage(protonJMessage); message.setMessageID(333); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementHelperTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementHelperTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementHelperTest.java index 0719b38..3151408 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementHelperTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementHelperTest.java @@ -20,7 +20,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; -import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; @@ -48,7 +48,7 @@ public class ManagementHelperTest extends Assert { String operationName = RandomUtil.randomString(); String param = RandomUtil.randomString(); String[] params = new String[]{RandomUtil.randomString(), RandomUtil.randomString(), RandomUtil.randomString()}; - Message msg = new ClientMessageImpl((byte) 0, false, 0, 0, (byte) 4, 1000); + ClientMessage msg = new ClientMessageImpl((byte) 0, false, 0, 0, (byte) 4, 1000); ManagementHelper.putOperationInvocation(msg, resource, operationName, param, params); Object[] parameters = ManagementHelper.retrieveOperationParameters(msg); @@ -135,7 +135,7 @@ public class ManagementHelperTest extends Assert { Object[] params = new Object[]{i, s, d, b, l, map, strArray, maps}; - Message msg = new ClientMessageImpl((byte) 0, false, 0, 0, (byte) 4, 1000); + ClientMessageImpl msg = new ClientMessageImpl((byte) 0, false, 0, 0, (byte) 4, 1000); ManagementHelper.putOperationInvocation(msg, resource, operationName, params); Object[] parameters = ManagementHelper.retrieveOperationParameters(msg); @@ -201,7 +201,7 @@ public class ManagementHelperTest extends Assert { Object[] params = new Object[]{"hello", map}; - Message msg = new ClientMessageImpl((byte) 0, false, 0, 0, (byte) 4, 1000); + ClientMessageImpl msg = new ClientMessageImpl((byte) 0, false, 0, 0, (byte) 4, 1000); ManagementHelper.putOperationInvocation(msg, resource, operationName, params); Object[] parameters = ManagementHelper.retrieveOperationParameters(msg); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementServiceImplTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementServiceImplTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementServiceImplTest.java index b6ea147..151341f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementServiceImplTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementServiceImplTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.tests.integration.management; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.management.AddressControl; @@ -50,7 +51,7 @@ public class ManagementServiceImplTest extends ActiveMQTestBase { server.start(); // invoke attribute and operation on the server - Message message = new CoreMessage(1, 100); + CoreMessage message = new CoreMessage(1, 100); ManagementHelper.putOperationInvocation(message, ResourceNames.BROKER, "createQueue", queue, address); Message reply = server.getManagementService().handleMessage(message); @@ -66,10 +67,10 @@ public class ManagementServiceImplTest extends ActiveMQTestBase { server.start(); // invoke attribute and operation on the server - Message message = new CoreMessage(1, 100); + CoreMessage message = new CoreMessage(1, 100); ManagementHelper.putOperationInvocation(message, ResourceNames.BROKER, "thereIsNoSuchOperation"); - Message reply = server.getManagementService().handleMessage(message); + ICoreMessage reply = server.getManagementService().handleMessage(message); Assert.assertFalse(ManagementHelper.hasOperationSucceeded(reply)); Assert.assertNotNull(ManagementHelper.getResult(reply)); @@ -83,10 +84,10 @@ public class ManagementServiceImplTest extends ActiveMQTestBase { server.start(); // invoke attribute and operation on the server - Message message = new CoreMessage(1, 100); + ICoreMessage message = new CoreMessage(1, 100); ManagementHelper.putOperationInvocation(message, "Resouce.Does.Not.Exist", "toString"); - Message reply = server.getManagementService().handleMessage(message); + ICoreMessage reply = server.getManagementService().handleMessage(message); Assert.assertFalse(ManagementHelper.hasOperationSucceeded(reply)); Assert.assertNotNull(ManagementHelper.getResult(reply)); @@ -100,11 +101,11 @@ public class ManagementServiceImplTest extends ActiveMQTestBase { server.start(); // invoke attribute and operation on the server - Message message = new CoreMessage(1, 100); + ICoreMessage message = new CoreMessage(1, 100); ManagementHelper.putAttribute(message, ResourceNames.BROKER, "started"); - Message reply = server.getManagementService().handleMessage(message); + ICoreMessage reply = server.getManagementService().handleMessage(message); Assert.assertTrue(ManagementHelper.hasOperationSucceeded(reply)); Assert.assertTrue((Boolean) ManagementHelper.getResult(reply)); @@ -118,11 +119,11 @@ public class ManagementServiceImplTest extends ActiveMQTestBase { server.start(); // invoke attribute and operation on the server - Message message = new CoreMessage(1, 100); + ICoreMessage message = new CoreMessage(1, 100); ManagementHelper.putAttribute(message, ResourceNames.BROKER, "attribute.Does.Not.Exist"); - Message reply = server.getManagementService().handleMessage(message); + ICoreMessage reply = server.getManagementService().handleMessage(message); Assert.assertFalse(ManagementHelper.hasOperationSucceeded(reply)); Assert.assertNotNull(ManagementHelper.getResult(reply)); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java index 1f0d7e0..3675416 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java @@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; @@ -289,7 +288,7 @@ public class PagingSendTest extends ActiveMQTestBase { List<String> messageIds = new ArrayList<>(); ClientProducer producer = session.createProducer(queueAddr); for (int i = 0; i < batchSize; i++) { - Message message = session.createMessage(true); + ClientMessage message = session.createMessage(true); message.getBodyBuffer().writeBytes(new byte[1024]); String id = UUID.randomUUID().toString(); message.putStringProperty("id", id); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java index 1714947..48127d2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java @@ -42,6 +42,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; @@ -75,7 +76,6 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContex import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; @@ -5536,7 +5536,7 @@ public class PagingTest extends ActiveMQTestBase { for (int i = 0; i < 100; i++) { Message msg = session.createMessage(true); - msg.getBodyBuffer().writeBytes(new byte[1024]); + msg.toCore().getBodyBuffer().writeBytes(new byte[1024]); prod.send(msg); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java index cba3008..ec49ece 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java @@ -425,7 +425,7 @@ public class ScaleDownTest extends ClusterTestBase { while (!servers[0].getPagingManager().getPageStore(new SimpleString(addressName)).isPaging()) { for (int i = 0; i < CHUNK_SIZE; i++) { - Message message = session.createMessage(true); + ClientMessage message = session.createMessage(true); message.getBodyBuffer().writeBytes(new byte[1024]); producer.send(message); messageCount++; @@ -463,7 +463,7 @@ public class ScaleDownTest extends ClusterTestBase { while (!servers[0].getPagingManager().getPageStore(new SimpleString(addressName)).isPaging()) { for (int i = 0; i < CHUNK_SIZE; i++) { - Message message = session.createMessage(true); + ClientMessage message = session.createMessage(true); message.getBodyBuffer().writeBytes(new byte[1024]); message.putIntProperty("order", i); producer.send(message); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ssl/CoreClientOverOneWaySSLTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ssl/CoreClientOverOneWaySSLTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ssl/CoreClientOverOneWaySSLTest.java index 89f7a60..141a6b8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ssl/CoreClientOverOneWaySSLTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ssl/CoreClientOverOneWaySSLTest.java @@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; @@ -41,7 +42,6 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.remoting.impl.ssl.SSLSupport; import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.RandomUtil; @@ -127,7 +127,7 @@ public class CoreClientOverOneWaySSLTest extends ActiveMQTestBase { ClientConsumer consumer = addClientConsumer(session.createConsumer(CoreClientOverOneWaySSLTest.QUEUE)); session.start(); - Message m = consumer.receive(1000); + ClientMessage m = consumer.receive(1000); Assert.assertNotNull(m); Assert.assertEquals(text, m.getBodyBuffer().readString()); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ssl/CoreClientOverTwoWaySSLTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ssl/CoreClientOverTwoWaySSLTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ssl/CoreClientOverTwoWaySSLTest.java index 772e44d..11b3b0b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ssl/CoreClientOverTwoWaySSLTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ssl/CoreClientOverTwoWaySSLTest.java @@ -26,7 +26,6 @@ import io.netty.handler.ssl.SslHandler; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException; import org.apache.activemq.artemis.api.core.Interceptor; -import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; @@ -151,7 +150,7 @@ public class CoreClientOverTwoWaySSLTest extends ActiveMQTestBase { ClientConsumer consumer = session.createConsumer(CoreClientOverTwoWaySSLTest.QUEUE); session.start(); - Message m = consumer.receive(1000); + ClientMessage m = consumer.receive(1000); Assert.assertNotNull(m); Assert.assertEquals(text, m.getBodyBuffer().readString()); } @@ -189,7 +188,7 @@ public class CoreClientOverTwoWaySSLTest extends ActiveMQTestBase { ClientConsumer consumer = session.createConsumer(CoreClientOverTwoWaySSLTest.QUEUE); session.start(); - Message m = consumer.receive(1000); + ClientMessage m = consumer.receive(1000); Assert.assertNotNull(m); Assert.assertEquals(text, m.getBodyBuffer().readString()); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/message/impl/MessageImplTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/message/impl/MessageImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/message/impl/MessageImplTest.java index 2e0ffac..e88097a 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/message/impl/MessageImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/message/impl/MessageImplTest.java @@ -21,6 +21,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl; @@ -46,9 +47,9 @@ public class MessageImplTest extends ActiveMQTestBase { final long expiration = RandomUtil.randomLong(); final long timestamp = RandomUtil.randomLong(); final byte priority = RandomUtil.randomByte(); - Message message1 = new ClientMessageImpl(type, durable, expiration, timestamp, priority, 100); + ICoreMessage message1 = new ClientMessageImpl(type, durable, expiration, timestamp, priority, 100); - Message message = message1; + ICoreMessage message = message1; Assert.assertEquals(type, message.getType()); Assert.assertEquals(durable, message.isDurable()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java index 37e33ed..847e8b7 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java @@ -19,7 +19,7 @@ package org.apache.activemq.artemis.tests.unit.core.paging.impl; import java.nio.ByteBuffer; import java.util.List; -import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; @@ -207,7 +207,7 @@ public class PageTest extends ActiveMQTestBase { int initialNumberOfMessages = page.getNumberOfMessages(); for (int i = 0; i < numberOfElements; i++) { - Message msg = new CoreMessage().initBuffer(100); + ICoreMessage msg = new CoreMessage().initBuffer(100); for (int j = 0; j < 10; j++) { msg.getBodyBuffer().writeByte((byte) 'b'); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java index 654fd89..60f7a15 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java @@ -22,7 +22,7 @@ import java.util.List; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; -import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.paging.PagedMessage; @@ -63,7 +63,7 @@ public class PagingManagerImplTest extends ActiveMQTestBase { PagingStore store = managerImpl.getPageStore(new SimpleString("simple-test")); - Message msg = createMessage(1L, new SimpleString("simple-test"), createRandomBuffer(10)); + ICoreMessage msg = createMessage(1L, new SimpleString("simple-test"), createRandomBuffer(10)); final RoutingContextImpl ctx = new RoutingContextImpl(null); Assert.assertFalse(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName()), lock)); @@ -82,7 +82,7 @@ public class PagingManagerImplTest extends ActiveMQTestBase { Assert.assertEquals(1, msgs.size()); - ActiveMQTestBase.assertEqualsByteArrays(msg.getBodyBuffer().writerIndex(), msg.getBodyBuffer().toByteBuffer().array(), (msgs.get(0).getMessage()).getBodyBuffer().toByteBuffer().array()); + ActiveMQTestBase.assertEqualsByteArrays(msg.getBodyBuffer().writerIndex(), msg.getBodyBuffer().toByteBuffer().array(), (msgs.get(0).getMessage()).toCore().getBodyBuffer().toByteBuffer().array()); Assert.assertTrue(store.isPaging()); @@ -104,10 +104,10 @@ public class PagingManagerImplTest extends ActiveMQTestBase { pageDirDir.mkdirs(); } - protected Message createMessage(final long messageId, - final SimpleString destination, - final ByteBuffer buffer) { - Message msg = new CoreMessage(messageId, 200); + protected ICoreMessage createMessage(final long messageId, + final SimpleString destination, + final ByteBuffer buffer) { + ICoreMessage msg = new CoreMessage(messageId, 200); msg.setAddress(destination); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java index 905e550..af58a53 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java @@ -224,7 +224,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase { for (int i = 0; i < numMessages; i++) { ActiveMQBuffer horn1 = buffers.get(i); - ActiveMQBuffer horn2 = msg.get(i).getMessage().getBodyBuffer(); + ActiveMQBuffer horn2 = msg.get(i).getMessage().toCore().getBodyBuffer(); horn1.resetReaderIndex(); horn2.resetReaderIndex(); for (int j = 0; j < horn1.writerIndex(); j++) { @@ -290,7 +290,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase { for (int i = 0; i < 5; i++) { Assert.assertEquals(sequence++, msg.get(i).getMessage().getMessageID()); - ActiveMQTestBase.assertEqualsBuffers(18, buffers.get(pageNr * 5 + i), msg.get(i).getMessage().getBodyBuffer()); + ActiveMQTestBase.assertEqualsBuffers(18, buffers.get(pageNr * 5 + i), msg.get(i).getMessage().toCore().getBodyBuffer()); } } @@ -341,7 +341,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase { Assert.assertEquals(1L, msgs.get(0).getMessage().getMessageID()); - ActiveMQTestBase.assertEqualsBuffers(18, buffers.get(0), msgs.get(0).getMessage().getBodyBuffer()); + ActiveMQTestBase.assertEqualsBuffers(18, buffers.get(0), msgs.get(0).getMessage().toCore().getBodyBuffer()); Assert.assertEquals(1, store.getNumberOfPages()); @@ -485,14 +485,14 @@ public class PagingStoreImplTest extends ActiveMQTestBase { page.close(); for (PagedMessage msg : msgs) { - long id = msg.getMessage().getBodyBuffer().readLong(); - msg.getMessage().getBodyBuffer().resetReaderIndex(); + long id = msg.getMessage().toCore().getBodyBuffer().readLong(); + msg.getMessage().toCore().getBodyBuffer().resetReaderIndex(); Message msgWritten = buffers.remove(id); buffers2.put(id, msg.getMessage()); Assert.assertNotNull(msgWritten); Assert.assertEquals(msg.getMessage().getAddress(), msgWritten.getAddressSimpleString()); - ActiveMQTestBase.assertEqualsBuffers(10, msgWritten.getBodyBuffer(), msg.getMessage().getBodyBuffer()); + ActiveMQTestBase.assertEqualsBuffers(10, msgWritten.toCore().getBodyBuffer(), msg.getMessage().toCore().getBodyBuffer()); } } @@ -547,11 +547,11 @@ public class PagingStoreImplTest extends ActiveMQTestBase { for (PagedMessage msg : msgs) { - long id = msg.getMessage().getBodyBuffer().readLong(); + long id = msg.getMessage().toCore().getBodyBuffer().readLong(); Message msgWritten = buffers2.remove(id); Assert.assertNotNull(msgWritten); Assert.assertEquals(msg.getMessage().getAddress(), msgWritten.getAddressSimpleString()); - ActiveMQTestBase.assertEqualsByteArrays(msgWritten.getBodyBuffer().writerIndex(), msgWritten.getBodyBuffer().toByteBuffer().array(), msg.getMessage().getBodyBuffer().toByteBuffer().array()); + ActiveMQTestBase.assertEqualsByteArrays(msgWritten.toCore().getBodyBuffer().writerIndex(), msgWritten.toCore().getBodyBuffer().toByteBuffer().array(), msg.getMessage().toCore().getBodyBuffer().toByteBuffer().array()); } } @@ -560,8 +560,8 @@ public class PagingStoreImplTest extends ActiveMQTestBase { lastPage.close(); Assert.assertEquals(1, lastMessages.size()); - lastMessages.get(0).getMessage().getBodyBuffer().resetReaderIndex(); - Assert.assertEquals(lastMessages.get(0).getMessage().getBodyBuffer().readLong(), lastMessageId); + lastMessages.get(0).getMessage().toCore().getBodyBuffer().resetReaderIndex(); + Assert.assertEquals(lastMessages.get(0).getMessage().toCore().getBodyBuffer().readLong(), lastMessageId); Assert.assertEquals(0, buffers2.size()); @@ -739,11 +739,11 @@ public class PagingStoreImplTest extends ActiveMQTestBase { }; } - private Message createMessage(final long id, + private CoreMessage createMessage(final long id, final PagingStore store, final SimpleString destination, final ActiveMQBuffer buffer) { - Message msg = new CoreMessage(id, 50 + buffer.capacity()); + CoreMessage msg = new CoreMessage(id, 50 + buffer.capacity()); msg.setAddress(destination);
