http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java index 548b62c..b997d80 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java @@ -17,10 +17,12 @@ package org.apache.activemq.artemis.core.protocol.mqtt; + +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.ServerConsumer; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; @@ -43,11 +45,11 @@ public class MQTTSessionCallback implements SessionCallback { @Override public int sendMessage(MessageReference reference, - ServerMessage message, + Message message, ServerConsumer consumer, int deliveryCount) { try { - session.getMqttPublishManager().sendMessage(message, consumer, deliveryCount); + session.getMqttPublishManager().sendMessage((CoreMessage)message, consumer, deliveryCount); } catch (Exception e) { log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage()); } @@ -70,7 +72,7 @@ public class MQTTSessionCallback implements SessionCallback { @Override public int sendLargeMessage(MessageReference reference, - ServerMessage message, + Message message, ServerConsumer consumer, long bodySize, int deliveryCount) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java index 7bc6b84..6891497 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java @@ -28,8 +28,7 @@ import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.config.WildcardConfiguration; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; /** * A Utility Class for creating Server Side objects and converting MQTT concepts to/from Artemis. @@ -93,13 +92,13 @@ public class MQTTUtil { return MQTT_RETAIN_ADDRESS_PREFIX + MQTT_WILDCARD.convert(filter, wildcardConfiguration); } - private static ServerMessage createServerMessage(MQTTSession session, + private static Message createServerMessage(MQTTSession session, SimpleString address, boolean retain, int qos) { long id = session.getServer().getStorageManager().generateID(); - ServerMessageImpl message = new ServerMessageImpl(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE); + CoreMessage message = new CoreMessage(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE); message.setAddress(address); message.putBooleanProperty(new SimpleString(MQTT_MESSAGE_RETAIN_KEY), retain); message.putIntProperty(new SimpleString(MQTT_QOS_LEVEL_KEY), qos); @@ -107,21 +106,21 @@ public class MQTTUtil { return message; } - public static ServerMessage createServerMessageFromByteBuf(MQTTSession session, + public static Message createServerMessageFromByteBuf(MQTTSession session, String topic, boolean retain, int qos, ByteBuf payload) { String coreAddress = convertMQTTAddressFilterToCore(topic, session.getWildcardConfiguration()); - ServerMessage message = createServerMessage(session, new SimpleString(coreAddress), retain, qos); + Message message = createServerMessage(session, new SimpleString(coreAddress), retain, qos); // FIXME does this involve a copy? message.getBodyBuffer().writeBytes(new ChannelBufferWrapper(payload), payload.readableBytes()); return message; } - public static ServerMessage createPubRelMessage(MQTTSession session, SimpleString address, int messageId) { - ServerMessage message = createServerMessage(session, address, false, 1); + public static Message createPubRelMessage(MQTTSession session, SimpleString address, int messageId) { + Message message = createServerMessage(session, address, false, 1); message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_ID_KEY), messageId); message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_TYPE_KEY), MqttMessageType.PUBREL.value()); return message; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 9b27b81..550a63a 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -36,11 +36,10 @@ import java.util.zip.InflaterOutputStream; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; import org.apache.activemq.artemis.core.server.MessageReference; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; import org.apache.activemq.artemis.utils.DataConstants; @@ -102,16 +101,16 @@ public class OpenWireMessageConverter implements MessageConverter { } @Override - public Object outbound(ServerMessage message, int deliveryCount) { + public Object outbound(org.apache.activemq.artemis.api.core.Message message, int deliveryCount) { // TODO: implement this return null; } @Override - public ServerMessage inbound(Object message) throws Exception { + public org.apache.activemq.artemis.api.core.Message inbound(Object message) throws Exception { Message messageSend = (Message) message; - ServerMessageImpl coreMessage = new ServerMessageImpl(-1, messageSend.getSize()); + CoreMessage coreMessage = new CoreMessage(-1, messageSend.getSize()); String type = messageSend.getType(); if (type != null) { @@ -157,7 +156,7 @@ public class OpenWireMessageConverter implements MessageConverter { mdataIn.close(); TypedProperties props = new TypedProperties(); loadMapIntoProperties(props, map); - props.encode(body); + props.encode(body.byteBuf()); break; case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE: if (messageCompressed) { @@ -415,8 +414,9 @@ public class OpenWireMessageConverter implements MessageConverter { } public static MessageDispatch createMessageDispatch(MessageReference reference, - ServerMessage message, + org.apache.activemq.artemis.api.core.Message message, AMQConsumer consumer) throws IOException, JMSException { + // TODO-now: use new Encode here ActiveMQMessage amqMessage = toAMQMessage(reference, message, consumer.getMarshaller(), consumer.getOpenwireDestination()); //we can use core message id for sequenceId @@ -433,7 +433,7 @@ public class OpenWireMessageConverter implements MessageConverter { } private static ActiveMQMessage toAMQMessage(MessageReference reference, - ServerMessage coreMessage, + org.apache.activemq.artemis.api.core.Message coreMessage, WireFormat marshaller, ActiveMQDestination actualDestination) throws IOException { ActiveMQMessage amqMsg = null; @@ -476,7 +476,7 @@ public class OpenWireMessageConverter implements MessageConverter { } amqMsg.setBrokerInTime(brokerInTime); - ActiveMQBuffer buffer = coreMessage.getBodyBufferDuplicate(); + ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer(); Boolean compressProp = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED); boolean isCompressed = compressProp == null ? false : compressProp.booleanValue(); amqMsg.setCompressed(isCompressed); @@ -503,7 +503,7 @@ public class OpenWireMessageConverter implements MessageConverter { TypedProperties mapData = new TypedProperties(); //it could be a null map if (buffer.readableBytes() > 0) { - mapData.decode(buffer); + mapData.decode(buffer.byteBuf()); Map<String, Object> map = mapData.getMap(); ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize()); OutputStream os = out; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index f471a2a..6f83c2d 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; @@ -35,7 +36,6 @@ import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.ServerConsumer; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; @@ -208,7 +208,7 @@ public class AMQConsumer { } - public int handleDeliver(MessageReference reference, ServerMessage message, int deliveryCount) { + public int handleDeliver(MessageReference reference, Message message, int deliveryCount) { MessageDispatch dispatch; try { if (messagePullHandler != null && !messagePullHandler.checkForcedConsumer(message)) { @@ -394,7 +394,7 @@ public class AMQConsumer { } } - public boolean checkForcedConsumer(ServerMessage message) { + public boolean checkForcedConsumer(Message message) { if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) { if (next >= 0) { if (timeout <= 0) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 79004ae..1b7ed43 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -36,7 +36,6 @@ import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.ServerConsumer; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; import org.apache.activemq.artemis.reader.MessageUtil; @@ -231,7 +230,7 @@ public class AMQSession implements SessionCallback { @Override public int sendMessage(MessageReference reference, - ServerMessage message, + org.apache.activemq.artemis.api.core.Message message, ServerConsumer consumer, int deliveryCount) { AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData(); @@ -240,7 +239,7 @@ public class AMQSession implements SessionCallback { @Override public int sendLargeMessage(MessageReference reference, - ServerMessage message, + org.apache.activemq.artemis.api.core.Message message, ServerConsumer consumerID, long bodySize, int deliveryCount) { @@ -296,7 +295,7 @@ public class AMQSession implements SessionCallback { actualDestinations = new ActiveMQDestination[]{destination}; } - ServerMessage originalCoreMsg = getConverter().inbound(messageSend); + org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend); if (connection.isNoLocal()) { //Note: advisory messages are dealt with in @@ -324,7 +323,7 @@ public class AMQSession implements SessionCallback { for (int i = 0; i < actualDestinations.length; i++) { ActiveMQDestination dest = actualDestinations[i]; SimpleString address = new SimpleString(dest.getPhysicalName()); - ServerMessage coreMsg = originalCoreMsg.copy(); + org.apache.activemq.artemis.api.core.Message coreMsg = originalCoreMsg.copy(); coreMsg.setAddress(address); if (actualDestinations[i].isQueue()) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java index 5355c63..2686907 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java @@ -18,8 +18,9 @@ package org.apache.activemq.artemis.core.protocol.openwire.util; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.config.WildcardConfiguration; -import org.apache.activemq.artemis.core.server.ServerMessage; + import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; @@ -53,8 +54,8 @@ public class OpenWireUtil { * set on publish/send so a divert or wildcard may mean thats its different to the destination subscribed to by the * consumer */ - public static ActiveMQDestination toAMQAddress(ServerMessage message, ActiveMQDestination actualDestination) { - String address = message.getAddress().toString(); + public static ActiveMQDestination toAMQAddress(Message message, ActiveMQDestination actualDestination) { + String address = message.getAddress(); String strippedAddress = address;//.replace(JMS_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TEMP_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TOPIC_ADDRESS_PREFIX, "").replace(JMS_TEMP_TOPIC_ADDRESS_PREFIX, ""); if (actualDestination.isQueue()) { return new ActiveMQQueue(strippedAddress); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java index 861c524..d377abd 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.core.protocol.stomp; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.jboss.logging.Messages; import org.jboss.logging.annotations.Cause; import org.jboss.logging.annotations.Message; @@ -71,7 +70,7 @@ public interface ActiveMQStompProtocolMessageBundle { ActiveMQStompException invalidConnection(); @Message(id = 339011, value = "Error sending message {0}", format = Message.Format.MESSAGE_FORMAT) - ActiveMQStompException errorSendMessage(ServerMessageImpl message, @Cause Exception e); + ActiveMQStompException errorSendMessage(org.apache.activemq.artemis.api.core.Message message, @Cause Exception e); @Message(id = 339012, value = "Error beginning a transaction {0}", format = Message.Format.MESSAGE_FORMAT) ActiveMQStompException errorBeginTx(String txID, @Cause Exception e); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index c004a0e..c64c1ea 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -30,18 +30,18 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10; import org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12; import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerSession; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; @@ -569,7 +569,7 @@ public final class StompConnection implements RemotingConnection { return valid; } - public ServerMessageImpl createServerMessage() { + public CoreMessage createServerMessage() { return manager.createServerMessage(); } @@ -598,7 +598,7 @@ public final class StompConnection implements RemotingConnection { } } - protected void sendServerMessage(ServerMessageImpl message, String txID) throws ActiveMQStompException { + protected void sendServerMessage(Message message, String txID) throws ActiveMQStompException { StompSession stompSession = getSession(txID); if (stompSession.isNoLocal()) { @@ -611,7 +611,7 @@ public final class StompConnection implements RemotingConnection { if (minLargeMessageSize == -1 || (message.getBodyBuffer().writerIndex() < minLargeMessageSize)) { stompSession.sendInternal(message, false); } else { - stompSession.sendInternalLarge(message, false); + stompSession.sendInternalLarge((CoreMessage)message, false); } } catch (Exception e) { throw BUNDLE.errorSendMessage(message, e).setHandler(frameHandler); @@ -726,7 +726,7 @@ public final class StompConnection implements RemotingConnection { return SERVER_NAME; } - public StompFrame createStompMessage(ServerMessage serverMessage, + public StompFrame createStompMessage(Message serverMessage, StompSubscription subscription, int deliveryCount) throws Exception { return frameHandler.createMessageFrame(serverMessage, subscription, deliveryCount); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java index 54339a4..2be0be4 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java @@ -33,12 +33,12 @@ import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ServerSession; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; @@ -345,8 +345,8 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St return validated; } - public ServerMessageImpl createServerMessage() { - return new ServerMessageImpl(server.getStorageManager().generateID(), 512); + public CoreMessage createServerMessage() { + return new CoreMessage(server.getStorageManager().generateID(), 512); } public void commitTransaction(StompConnection connection, String txID) throws Exception { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java index 1e103e9..d2d42b7 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java @@ -28,20 +28,18 @@ import java.util.zip.Inflater; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.message.BodyEncoder; -import org.apache.activemq.artemis.core.message.impl.MessageImpl; +import org.apache.activemq.artemis.core.message.LargeBodyEncoder; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.ServerConsumer; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerSession; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; @@ -127,11 +125,13 @@ public class StompSession implements SessionCallback { @Override public int sendMessage(MessageReference ref, - ServerMessage serverMessage, + Message serverMessage, final ServerConsumer consumer, int deliveryCount) { + + //TODO-now: fix encoders LargeServerMessageImpl largeMessage = null; - ServerMessage newServerMessage = serverMessage; + Message newServerMessage = serverMessage; try { StompSubscription subscription = subscriptions.get(consumer.getID()); StompFrame frame = null; @@ -139,20 +139,23 @@ public class StompSession implements SessionCallback { newServerMessage = serverMessage.copy(); largeMessage = (LargeServerMessageImpl) serverMessage; - BodyEncoder encoder = largeMessage.getBodyEncoder(); + LargeBodyEncoder encoder = largeMessage.getBodyEncoder(); encoder.open(); int bodySize = (int) encoder.getLargeBodySize(); + // TODO-now: Convert large mesasge body into the stomp message //large message doesn't have a body. - ((ServerMessageImpl) newServerMessage).createBody(bodySize); - encoder.encode(newServerMessage.getBodyBuffer(), bodySize); - encoder.close(); + // ((Message) newServerMessage).createBody(bodySize); +// encoder.encode(((ServerMessage)newServerMessage).getBodyBuffer(), bodySize); +// encoder.close(); + + throw new RuntimeException("Large message body won't work with stomp now"); } if (serverMessage.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)) { //decompress ActiveMQBuffer qbuff = newServerMessage.getBodyBuffer(); - int bytesToRead = qbuff.writerIndex() - MessageImpl.BODY_OFFSET; + int bytesToRead = qbuff.writerIndex() - CoreMessage.BODY_OFFSET; Inflater inflater = new Inflater(); inflater.setInput(ByteUtil.getActiveArray(qbuff.readBytes(bytesToRead).toByteBuffer())); @@ -219,7 +222,7 @@ public class StompSession implements SessionCallback { @Override public int sendLargeMessage(MessageReference ref, - ServerMessage msg, + Message msg, ServerConsumer consumer, long bodySize, int deliveryCount) { @@ -370,11 +373,11 @@ public class StompSession implements SessionCallback { this.noLocal = noLocal; } - public void sendInternal(ServerMessageImpl message, boolean direct) throws Exception { + public void sendInternal(Message message, boolean direct) throws Exception { session.send(message, direct); } - public void sendInternalLarge(ServerMessageImpl message, boolean direct) throws Exception { + public void sendInternalLarge(CoreMessage message, boolean direct) throws Exception { int headerSize = message.getHeadersAndPropertiesEncodeSize(); if (headerSize >= connection.getMinLargeMessageSize()) { throw BUNDLE.headerTooBig(); @@ -384,7 +387,7 @@ public class StompSession implements SessionCallback { long id = storageManager.generateID(); LargeServerMessage largeMessage = storageManager.createLargeMessage(id, message); - byte[] bytes = new byte[message.getBodyBuffer().writerIndex() - MessageImpl.BODY_OFFSET]; + byte[] bytes = new byte[message.getBodyBuffer().writerIndex() - CoreMessage.BODY_OFFSET]; message.getBodyBuffer().readBytes(bytes); largeMessage.addBytes(bytes); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java index affab84..7db9d82 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java @@ -24,8 +24,6 @@ import java.util.Set; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.reader.MessageUtil; public class StompUtils { @@ -37,7 +35,7 @@ public class StompUtils { // Static -------------------------------------------------------- - public static void copyStandardHeadersFromFrameToMessage(StompFrame frame, ServerMessageImpl msg) throws Exception { + public static void copyStandardHeadersFromFrameToMessage(StompFrame frame, Message msg) throws Exception { Map<String, String> headers = new HashMap<>(frame.getHeadersMap()); String priority = headers.remove(Stomp.Headers.Send.PRIORITY); @@ -79,7 +77,7 @@ public class StompUtils { } } - public static void copyStandardHeadersFromMessageToFrame(MessageInternal message, + public static void copyStandardHeadersFromMessageToFrame(Message message, StompFrame command, int deliveryCount) throws Exception { command.addHeader(Stomp.Headers.Message.MESSAGE_ID, String.valueOf(message.getMessageID())); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java index f91ba82..8d13613 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java @@ -21,15 +21,13 @@ import java.util.concurrent.ScheduledExecutorService; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.message.impl.MessageImpl; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.protocol.stomp.Stomp.Headers; import org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10; import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11; import org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12; -import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.ExecutorFactory; @@ -180,7 +178,7 @@ public abstract class VersionedStompFrameHandler { long timestamp = System.currentTimeMillis(); - ServerMessageImpl message = connection.createServerMessage(); + CoreMessage message = connection.createServerMessage(); if (routingType != null) { message.putByteProperty(Message.HDR_ROUTING_TYPE, routingType.getType()); } @@ -289,7 +287,7 @@ public abstract class VersionedStompFrameHandler { return response; } - public StompFrame createMessageFrame(ServerMessage serverMessage, + public StompFrame createMessageFrame(Message serverMessage, StompSubscription subscription, int deliveryCount) throws Exception { StompFrame frame = createStompFrame(Stomp.Responses.MESSAGE); @@ -298,11 +296,12 @@ public abstract class VersionedStompFrameHandler { frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID()); } - ActiveMQBuffer buffer = serverMessage.getBodyBufferDuplicate(); + // TODO-now fix encoders + ActiveMQBuffer buffer = serverMessage.getReadOnlyBodyBuffer(); - int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex() : serverMessage.getEndOfBodyPosition(); + int bodyPos = ((CoreMessage)serverMessage).getEndOfBodyPosition() == -1 ? buffer.writerIndex() : ((CoreMessage)serverMessage).getEndOfBodyPosition(); - buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT); + buffer.readerIndex(CoreMessage.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT); int size = bodyPos - buffer.readerIndex(); @@ -321,7 +320,7 @@ public abstract class VersionedStompFrameHandler { } frame.setByteBody(data); - StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount); + StompUtils.copyStandardHeadersFromMessageToFrame((serverMessage), frame, deliveryCount); return frame; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java index 6b211d2..b14605d 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.stomp.v12; import java.util.concurrent.ScheduledExecutorService; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException; import org.apache.activemq.artemis.core.protocol.stomp.Stomp; import org.apache.activemq.artemis.core.protocol.stomp.StompConnection; @@ -27,7 +28,6 @@ import org.apache.activemq.artemis.core.protocol.stomp.StompSubscription; import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11; import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameV11; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.utils.ExecutorFactory; import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE; @@ -48,7 +48,7 @@ public class StompFrameHandlerV12 extends StompFrameHandlerV11 { } @Override - public StompFrame createMessageFrame(ServerMessage serverMessage, + public StompFrame createMessageFrame(Message serverMessage, StompSubscription subscription, int deliveryCount) throws Exception { StompFrame frame = super.createMessageFrame(serverMessage, subscription, deliveryCount); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java index 7881470..30d6668 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java @@ -750,10 +750,6 @@ public interface Configuration { Configuration setLogJournalWriteRate(boolean rate); - int getJournalPerfBlastPages(); - - Configuration setJournalPerfBlastPages(int pages); - long getServerDumpInterval(); Configuration setServerDumpInterval(long interval); @@ -766,10 +762,6 @@ public interface Configuration { Configuration setMemoryMeasureInterval(long memoryMeasureInterval); - boolean isRunSyncSpeedTest(); - - Configuration setRunSyncSpeedTest(boolean run); - // Paging Properties -------------------------------------------------------------------- /** http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java index f4eda91..329f654 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java @@ -193,10 +193,6 @@ public class ConfigurationImpl implements Configuration, Serializable { protected boolean logJournalWriteRate = ActiveMQDefaultConfiguration.isDefaultJournalLogWriteRate(); - protected int journalPerfBlastPages = ActiveMQDefaultConfiguration.getDefaultJournalPerfBlastPages(); - - protected boolean runSyncSpeedTest = ActiveMQDefaultConfiguration.isDefaultRunSyncSpeedTest(); - private WildcardConfiguration wildcardConfiguration = new WildcardConfiguration(); private boolean messageCounterEnabled = ActiveMQDefaultConfiguration.isDefaultMessageCounterEnabled(); @@ -854,28 +850,6 @@ public class ConfigurationImpl implements Configuration, Serializable { } @Override - public int getJournalPerfBlastPages() { - return journalPerfBlastPages; - } - - @Override - public ConfigurationImpl setJournalPerfBlastPages(final int journalPerfBlastPages) { - this.journalPerfBlastPages = journalPerfBlastPages; - return this; - } - - @Override - public boolean isRunSyncSpeedTest() { - return runSyncSpeedTest; - } - - @Override - public ConfigurationImpl setRunSyncSpeedTest(final boolean run) { - runSyncSpeedTest = run; - return this; - } - - @Override public boolean isCreateBindingsDir() { return createBindingsDir; } @@ -1556,7 +1530,6 @@ public class ConfigurationImpl implements Configuration, Serializable { result = prime * result + journalMaxIO_AIO; result = prime * result + journalMaxIO_NIO; result = prime * result + journalMinFiles; - result = prime * result + journalPerfBlastPages; result = prime * result + (journalSyncNonTransactional ? 1231 : 1237); result = prime * result + (journalSyncTransactional ? 1231 : 1237); result = prime * result + ((journalType == null) ? 0 : journalType.hashCode()); @@ -1580,7 +1553,6 @@ public class ConfigurationImpl implements Configuration, Serializable { result = prime * result + (persistIDCache ? 1231 : 1237); result = prime * result + (persistenceEnabled ? 1231 : 1237); result = prime * result + ((queueConfigurations == null) ? 0 : queueConfigurations.hashCode()); - result = prime * result + (runSyncSpeedTest ? 1231 : 1237); result = prime * result + scheduledThreadPoolMaxSize; result = prime * result + (securityEnabled ? 1231 : 1237); result = prime * result + (populateValidatedUser ? 1231 : 1237); @@ -1723,8 +1695,6 @@ public class ConfigurationImpl implements Configuration, Serializable { return false; if (journalMinFiles != other.journalMinFiles) return false; - if (journalPerfBlastPages != other.journalPerfBlastPages) - return false; if (journalSyncNonTransactional != other.journalSyncNonTransactional) return false; if (journalSyncTransactional != other.journalSyncTransactional) @@ -1793,8 +1763,6 @@ public class ConfigurationImpl implements Configuration, Serializable { return false; } else if (!queueConfigurations.equals(other.queueConfigurations)) return false; - if (runSyncSpeedTest != other.runSyncSpeedTest) - return false; if (scheduledThreadPoolMaxSize != other.scheduledThreadPoolMaxSize) return false; if (securityEnabled != other.securityEnabled) http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index cea0598..4055b5c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -548,10 +548,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { config.setLogJournalWriteRate(getBoolean(e, "log-journal-write-rate", ActiveMQDefaultConfiguration.isDefaultJournalLogWriteRate())); - config.setJournalPerfBlastPages(getInteger(e, "perf-blast-pages", ActiveMQDefaultConfiguration.getDefaultJournalPerfBlastPages(), Validators.MINUS_ONE_OR_GT_ZERO)); - - config.setRunSyncSpeedTest(getBoolean(e, "run-sync-speed-test", config.isRunSyncSpeedTest())); - if (e.hasAttribute("wild-card-routing-enabled")) { config.setWildcardRoutingEnabled(getBoolean(e, "wild-card-routing-enabled", config.isWildcardRoutingEnabled())); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java index 41d5e54..3737e19 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java @@ -16,8 +16,8 @@ */ package org.apache.activemq.artemis.core.filter; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.server.ServerMessage; public interface Filter { @@ -31,7 +31,7 @@ public interface Filter { */ String GENERIC_IGNORED_FILTER = "__AMQX=-1"; - boolean match(ServerMessage message); + boolean match(Message message); SimpleString getFilterString(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java index 0a459c9..9d321c7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java @@ -18,11 +18,11 @@ package org.apache.activemq.artemis.core.filter.impl; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.FilterConstants; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.selector.filter.BooleanExpression; import org.apache.activemq.artemis.selector.filter.FilterException; import org.apache.activemq.artemis.selector.filter.Filterable; @@ -103,7 +103,7 @@ public class FilterImpl implements Filter { } @Override - public synchronized boolean match(final ServerMessage message) { + public synchronized boolean match(final Message message) { try { boolean result = booleanExpression.matches(new FilterableServerMessage(message)); return result; @@ -148,7 +148,7 @@ public class FilterImpl implements Filter { // Private -------------------------------------------------------------------------- - private static Object getHeaderFieldValue(final ServerMessage msg, final SimpleString fieldName) { + private static Object getHeaderFieldValue(final Message msg, final SimpleString fieldName) { if (FilterConstants.ACTIVEMQ_USERID.equals(fieldName)) { if (msg.getUserID() == null) { // Proton stores JMSMessageID as NATIVE_MESSAGE_ID that is an arbitrary string @@ -178,9 +178,9 @@ public class FilterImpl implements Filter { private static class FilterableServerMessage implements Filterable { - private final ServerMessage message; + private final Message message; - private FilterableServerMessage(ServerMessage message) { + private FilterableServerMessage(Message message) { this.message = message; } @@ -191,7 +191,7 @@ public class FilterImpl implements Filter { result = getHeaderFieldValue(message, new SimpleString(id)); } if (result == null) { - result = message.getObjectProperty(new SimpleString(id)); + result = message.getObjectProperty(id); } if (result != null) { if (result.getClass() == SimpleString.class) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java index 09dd702..31e056c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java @@ -25,10 +25,12 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.management.AddressControl; import org.apache.activemq.artemis.api.core.management.QueueControl; import org.apache.activemq.artemis.api.core.management.ResourceNames; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -40,9 +42,7 @@ import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.security.SecurityAuth; import org.apache.activemq.artemis.core.security.SecurityStore; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -282,7 +282,7 @@ public class AddressControlImpl extends AbstractControl implements AddressContro return null; } }); - ServerMessageImpl message = new ServerMessageImpl(storageManager.generateID(), 50); + CoreMessage message = new CoreMessage(storageManager.generateID(), 50); for (String header : headers.keySet()) { message.putStringProperty(new SimpleString(header), new SimpleString(headers.get(header))); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index 4b84909..5ecea64 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -39,7 +39,7 @@ import org.apache.activemq.artemis.api.core.management.QueueControl; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.impl.FilterImpl; import org.apache.activemq.artemis.core.management.impl.openmbean.OpenTypeSupport; -import org.apache.activemq.artemis.core.message.impl.MessageImpl; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.messagecounter.MessageCounter; import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -53,8 +53,6 @@ import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -609,7 +607,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { try { Filter singleMessageFilter = new Filter() { @Override - public boolean match(ServerMessage message) { + public boolean match(Message message) { return message.getMessageID() == messageID; } @@ -738,7 +736,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { return null; } }); - ServerMessageImpl message = new ServerMessageImpl(storageManager.generateID(), 50); + CoreMessage message = new CoreMessage(storageManager.generateID(), 50); for (String header : headers.keySet()) { message.putStringProperty(new SimpleString(header), new SimpleString(headers.get(header))); } @@ -755,7 +753,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { message.setAddress(queue.getAddress()); ByteBuffer buffer = ByteBuffer.allocate(8); buffer.putLong(queue.getID()); - message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array()); + message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, buffer.array()); postOffice.route(message, true); return "" + message.getMessageID(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java index ec6848b..9f36b7f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java @@ -35,7 +35,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.MessageReference; -import org.apache.activemq.artemis.core.server.ServerMessage; + public final class OpenTypeSupport { @@ -128,6 +128,7 @@ public final class OpenTypeSupport { public Map<String, Object> getFields(MessageReference ref) throws OpenDataException { Map<String, Object> rc = new HashMap<>(); + // TODO-now: fix this Message m = ref.getMessage(); rc.put(CompositeDataConstants.MESSAGE_ID, "" + m.getMessageID()); if (m.getUserID() != null) { @@ -143,6 +144,11 @@ public final class OpenTypeSupport { rc.put(CompositeDataConstants.PRIORITY, m.getPriority()); rc.put(CompositeDataConstants.REDELIVERED, ref.getDeliveryCount() > 1); + ActiveMQBuffer bodyCopy = m.getReadOnlyBodyBuffer(); + byte[] bytes = new byte[bodyCopy.readableBytes()]; + bodyCopy.readBytes(bytes); + rc.put(CompositeDataConstants.BODY, bytes); + Map<String, Object> propertyMap = m.toPropertyMap(); rc.put(CompositeDataConstants.PROPERTIES, "" + propertyMap); @@ -264,8 +270,8 @@ public final class OpenTypeSupport { @Override public Map<String, Object> getFields(MessageReference ref) throws OpenDataException { Map<String, Object> rc = super.getFields(ref); - ServerMessage m = ref.getMessage(); - ActiveMQBuffer bodyCopy = m.getBodyBufferDuplicate(); + Message m = ref.getMessage(); + ActiveMQBuffer bodyCopy = m.getReadOnlyBodyBuffer(); byte[] bytes = new byte[bodyCopy.readableBytes()]; bodyCopy.readBytes(bytes); rc.put(CompositeDataConstants.BODY, bytes); @@ -285,8 +291,8 @@ public final class OpenTypeSupport { @Override public Map<String, Object> getFields(MessageReference ref) throws OpenDataException { Map<String, Object> rc = super.getFields(ref); - ServerMessage m = ref.getMessage(); - SimpleString text = m.getBodyBuffer().copy().readNullableSimpleString(); + Message m = ref.getMessage(); + SimpleString text = m.getReadOnlyBodyBuffer().readNullableSimpleString(); rc.put(CompositeDataConstants.TEXT_BODY, text != null ? text.toString() : ""); return rc; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java index 9b1e243..b3d8adb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java @@ -16,9 +16,10 @@ */ package org.apache.activemq.artemis.core.paging; + +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.persistence.StorageManager; -import org.apache.activemq.artemis.core.server.ServerMessage; /** * A Paged message. @@ -28,7 +29,7 @@ import org.apache.activemq.artemis.core.server.ServerMessage; */ public interface PagedMessage extends EncodingSupport { - ServerMessage getMessage(); + Message getMessage(); /** * The queues that were routed during paging http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java index 5ead1a2..a7de713 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java @@ -20,13 +20,15 @@ import java.io.File; import java.util.Collection; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RefCountMessageListener; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider; import org.apache.activemq.artemis.core.paging.impl.Page; import org.apache.activemq.artemis.core.replication.ReplicationManager; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.RouteContextList; -import org.apache.activemq.artemis.core.server.ServerMessage; + import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -41,7 +43,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction; * * @see PagingManager */ -public interface PagingStore extends ActiveMQComponent { +public interface PagingStore extends ActiveMQComponent, RefCountMessageListener { SimpleString getAddress(); @@ -90,7 +92,7 @@ public interface PagingStore extends ActiveMQComponent { * needs to be sent to the journal * @throws NullPointerException if {@code readLock} is null */ - boolean page(ServerMessage message, Transaction tx, RouteContextList listCtx, ReadLock readLock) throws Exception; + boolean page(Message message, Transaction tx, RouteContextList listCtx, ReadLock readLock) throws Exception; Page createPage(final int page) throws Exception; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java index 768b43f..823eef4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java @@ -20,11 +20,11 @@ import java.lang.ref.WeakReference; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.Message; + import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.transaction.Transaction; import org.jboss.logging.Logger; @@ -41,7 +41,7 @@ public class PagedReferenceImpl implements PagedReference { private int persistedCount; - private int messageEstimate; + private int messageEstimate = -1; private Long consumerId; @@ -64,7 +64,7 @@ public class PagedReferenceImpl implements PagedReference { } @Override - public ServerMessage getMessage() { + public Message getMessage() { return getPagedMessage().getMessage(); } @@ -93,12 +93,6 @@ public class PagedReferenceImpl implements PagedReference { final PagedMessage message, final PageSubscription subscription) { this.position = position; - - if (message == null) { - this.messageEstimate = -1; - } else { - this.messageEstimate = message.getMessage().getMemoryEstimate(); - } this.message = new WeakReference<>(message); this.subscription = subscription; } @@ -120,7 +114,7 @@ public class PagedReferenceImpl implements PagedReference { @Override public int getMessageMemoryEstimate() { - if (messageEstimate < 0) { + if (messageEstimate <= 0) { try { messageEstimate = getMessage().getMemoryEstimate(); } catch (Throwable e) { @@ -139,7 +133,7 @@ public class PagedReferenceImpl implements PagedReference { public long getScheduledDeliveryTime() { if (deliveryTime == null) { try { - ServerMessage msg = getMessage(); + Message msg = getMessage(); if (msg.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) { deliveryTime = getMessage().getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME); } else { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java index c40d20d..ab10eb4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java @@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.paging.PageTransactionInfo; @@ -50,7 +51,6 @@ import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; @@ -772,7 +772,7 @@ final class PageSubscriptionImpl implements PageSubscription { // Protected ----------------------------------------------------- - private boolean match(final ServerMessage message) { + private boolean match(final Message message) { if (filter == null) { return true; } else { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java index 4993d0c..aabec54 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java @@ -132,7 +132,7 @@ public final class Page implements Comparable<Page> { int messageSize = fileBuffer.readInt(); int oldPos = fileBuffer.readerIndex(); if (fileBuffer.readerIndex() + messageSize < fileBuffer.capacity() && fileBuffer.getByte(oldPos + messageSize) == Page.END_BYTE) { - PagedMessage msg = new PagedMessageImpl(); + PagedMessage msg = new PagedMessageImpl(storageManager); msg.decode(fileBuffer); byte b = fileBuffer.readByte(); if (b != Page.END_BYTE) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java index e40d107..d50dd2e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java @@ -19,12 +19,12 @@ package org.apache.activemq.artemis.core.paging.impl; import java.util.Arrays; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessagePersister; import org.apache.activemq.artemis.core.server.LargeServerMessage; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; +import org.apache.activemq.artemis.spi.core.protocol.MessagePersister; import org.apache.activemq.artemis.utils.DataConstants; /** @@ -38,39 +38,37 @@ public class PagedMessageImpl implements PagedMessage { */ private byte[] largeMessageLazyData; - private ServerMessage message; + private Message message; private long[] queueIDs; private long transactionID = 0; - public PagedMessageImpl(final ServerMessage message, final long[] queueIDs, final long transactionID) { + private volatile StorageManager storageManager; + + public PagedMessageImpl(final Message message, final long[] queueIDs, final long transactionID) { this(message, queueIDs); this.transactionID = transactionID; } - public PagedMessageImpl(final ServerMessage message, final long[] queueIDs) { + public PagedMessageImpl(final Message message, final long[] queueIDs) { this.queueIDs = queueIDs; this.message = message; } - public PagedMessageImpl() { + public PagedMessageImpl(StorageManager storageManager) { + this.storageManager = storageManager; } @Override - public ServerMessage getMessage() { + public Message getMessage() { return message; } @Override public void initMessage(StorageManager storage) { if (largeMessageLazyData != null) { - LargeServerMessage lgMessage = storage.createLargeMessage(); - ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(largeMessageLazyData); - lgMessage.decodeHeadersAndProperties(buffer); - lgMessage.incrementDelayDeletionCount(); - lgMessage.setPaged(); - message = lgMessage; + // TODO-now: use the largeMessagePersister largeMessageLazyData = null; } } @@ -96,15 +94,15 @@ public class PagedMessageImpl implements PagedMessage { if (isLargeMessage) { int largeMessageHeaderSize = buffer.readInt(); - largeMessageLazyData = new byte[largeMessageHeaderSize]; - - buffer.readBytes(largeMessageLazyData); + if (storageManager == null) { + largeMessageLazyData = new byte[largeMessageHeaderSize]; + buffer.readBytes(largeMessageLazyData); + } else { + this.message = storageManager.createLargeMessage(); + LargeMessagePersister.getInstance().decode(buffer, (LargeServerMessage) message); + } } else { - buffer.readInt(); // This value is only used on LargeMessages for now - - message = new ServerMessageImpl(-1, 50); - - message.decode(buffer); + this.message = MessagePersister.getInstance().decode(buffer, null); } int queueIDsSize = buffer.readInt(); @@ -120,11 +118,16 @@ public class PagedMessageImpl implements PagedMessage { public void encode(final ActiveMQBuffer buffer) { buffer.writeLong(transactionID); - buffer.writeBoolean(message instanceof LargeServerMessage); + boolean isLargeMessage = isLargeMessage(); - buffer.writeInt(message.getEncodeSize()); + buffer.writeBoolean(isLargeMessage); - message.encode(buffer); + if (isLargeMessage) { + buffer.writeInt(LargeMessagePersister.getInstance().getEncodeSize((LargeServerMessage)message)); + LargeMessagePersister.getInstance().encode(buffer, (LargeServerMessage) message); + } else { + message.getPersister().encode(buffer, message); + } buffer.writeInt(queueIDs.length); @@ -133,10 +136,19 @@ public class PagedMessageImpl implements PagedMessage { } } + private boolean isLargeMessage() { + return message.isLargeMessage(); + } + @Override public int getEncodeSize() { - return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + message.getEncodeSize() + - DataConstants.SIZE_INT + queueIDs.length * DataConstants.SIZE_LONG; + if (isLargeMessage()) { + return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + LargeMessagePersister.getInstance().getEncodeSize((LargeServerMessage)message) + + DataConstants.SIZE_INT + queueIDs.length * DataConstants.SIZE_LONG; + } else { + return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + message.getPersister().getEncodeSize(message) + + DataConstants.SIZE_INT + queueIDs.length * DataConstants.SIZE_LONG; + } } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 4e57c85..e39fe40 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -36,6 +36,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; @@ -54,7 +55,8 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.RouteContextList; -import org.apache.activemq.artemis.core.server.ServerMessage; + +import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -699,7 +701,6 @@ public class PagingStoreImpl implements PagingStore { @Override public void addSize(final int size) { - boolean globalFull = pagingManager.addSize(size).isGlobalFull(); long newSize = sizeInBytes.addAndGet(size); @@ -747,7 +748,7 @@ public class PagingStoreImpl implements PagingStore { } @Override - public boolean page(ServerMessage message, + public boolean page(Message message, final Transaction tx, RouteContextList listCtx, final ReadLock managerLock) throws Exception { @@ -806,11 +807,12 @@ public class PagingStoreImpl implements PagingStore { return false; } - if (!message.isDurable()) { - // The address should never be transient when paging (even for non-persistent messages when paging) - // This will force everything to be persisted - message.forceAddress(address); - } + message.setAddress(address); +// if (!message.isDurable()) { +// // The address should never be transient when paging (even for non-persistent messages when paging) +// // This will force everything to be persisted +// message.forceAddress(address); +// } final long transactionID = tx == null ? -1 : tx.getID(); PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), transactionID); @@ -920,6 +922,40 @@ public class PagingStoreImpl implements PagingStore { } + @Override + public void durableDown(Message message, int durableCount) { + } + + @Override + public void durableUp(Message message, int durableCount) { + } + + @Override + public void nonDurableUp(Message message, int count) { + if (count == 1) { + this.addSize(message.getMemoryEstimate() + MessageReferenceImpl.getMemoryEstimate()); + } else { + this.addSize(MessageReferenceImpl.getMemoryEstimate()); + } + } + + @Override + public void nonDurableDown(Message message, int count) { + if (count < 0) { + // this could happen on paged messages since they are not routed and incrementRefCount is never called + return; + } + + if (count == 0) { + this.addSize(-message.getMemoryEstimate() - MessageReferenceImpl.getMemoryEstimate()); + + } else { + this.addSize(-MessageReferenceImpl.getMemoryEstimate()); + } + + + } + private void installPageTransaction(final Transaction tx, final RouteContextList listCtx) throws Exception { FinishPageMessageOperation pgOper = (FinishPageMessageOperation) tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION); if (pgOper == null) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java index b45775c..e27ed30 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java @@ -23,13 +23,13 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.paging.PageTransactionInfo; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingManager; @@ -45,7 +45,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.RouteContextList; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; import org.apache.activemq.artemis.core.server.group.impl.GroupBinding; import org.apache.activemq.artemis.core.server.impl.AddressInfo; @@ -172,7 +171,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent { */ void confirmPendingLargeMessage(long recordID) throws Exception; - void storeMessage(ServerMessage message) throws Exception; + void storeMessage(Message message) throws Exception; void storeReference(long queueID, long messageID, boolean last) throws Exception; @@ -190,7 +189,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent { void deleteDuplicateID(long recordID) throws Exception; - void storeMessageTransactional(long txID, ServerMessage message) throws Exception; + void storeMessageTransactional(long txID, Message message) throws Exception; void storeReferenceTransactional(long txID, long queueID, long messageID) throws Exception; @@ -225,7 +224,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent { * @return a large message object * @throws Exception */ - LargeServerMessage createLargeMessage(long id, MessageInternal message) throws Exception; + LargeServerMessage createLargeMessage(long id, Message message) throws Exception; enum LargeMessageExtension { DURABLE(".msg"), TEMPORARY(".tmp"), SYNC(".sync"); @@ -265,11 +264,6 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent { void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int depage) throws Exception; - /** - * FIXME Unused - */ - void updatePageTransaction(PageTransactionInfo pageTransaction, int depage) throws Exception; - void deletePageTransactional(long recordID) throws Exception; JournalLoadInformation loadMessageJournal(final PostOffice postOffice, @@ -383,7 +377,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent { * needs to be sent to the journal * @throws Exception */ - boolean addToPage(PagingStore store, ServerMessage msg, Transaction tx, RouteContextList listCtx) throws Exception; + boolean addToPage(PagingStore store, Message msg, Transaction tx, RouteContextList listCtx) throws Exception; /** * Stops the replication of data from the live to the backup.
