http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java new file mode 100644 index 0000000..5793d58 --- /dev/null +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java @@ -0,0 +1,499 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.protocol.openwire; + +import java.util.Set; + +import io.netty.buffer.ByteBuf; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RefCountMessageListener; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.persistence.Persister; + +// TODO: Implement this +public class OpenwireMessage implements Message { + + @Override + public boolean containsProperty(SimpleString key) { + return false; + } + + @Override + public void messageChanged() { + + } + + @Override + public RoutingType getRouteType() { + return null; + } + + @Override + public SimpleString getReplyTo() { + return null; + } + + @Override + public Message setReplyTo(SimpleString address) { + return null; + } + + @Override + public boolean containsDeliveryAnnotationProperty(SimpleString property) { + return false; + } + + @Override + public Object removeDeliveryAnnoationProperty(SimpleString key) { + return null; + } + + @Override + public Object getDeliveryAnnotationProperty(SimpleString key) { + return null; + } + + @Override + public Long getScheduledDeliveryTime() { + return null; + } + + @Override + public RefCountMessageListener getContext() { + return null; + } + + @Override + public Message setContext(RefCountMessageListener context) { + return null; + } + + @Override + public Message setBuffer(ByteBuf buffer) { + return null; + } + + @Override + public ByteBuf getBuffer() { + return null; + } + + @Override + public Message copy() { + return null; + } + + @Override + public Message copy(long newID) { + return null; + } + + @Override + public long getMessageID() { + return 0; + } + + @Override + public Message setMessageID(long id) { + return null; + } + + @Override + public long getExpiration() { + return 0; + } + + @Override + public Message setExpiration(long expiration) { + return null; + } + + @Override + public Object getUserID() { + return null; + } + + @Override + public Message setUserID(Object userID) { + return null; + } + + @Override + public boolean isDurable() { + return false; + } + + @Override + public Message setDurable(boolean durable) { + return null; + } + + @Override + public Persister<Message> getPersister() { + return null; + } + + @Override + public String getAddress() { + return null; + } + + @Override + public Message setAddress(String address) { + return null; + } + + @Override + public SimpleString getAddressSimpleString() { + return null; + } + + @Override + public Message setAddress(SimpleString address) { + return null; + } + + @Override + public long getTimestamp() { + return 0; + } + + @Override + public Message setTimestamp(long timestamp) { + return null; + } + + @Override + public byte getPriority() { + return 0; + } + + @Override + public Message setPriority(byte priority) { + return null; + } + + @Override + public void receiveBuffer(ByteBuf buffer) { + + } + + @Override + public void sendBuffer(ByteBuf buffer, int deliveryCount) { + + } + + @Override + public int getPersistSize() { + return 0; + } + + @Override + public void persist(ActiveMQBuffer targetRecord) { + + } + + @Override + public void reloadPersistence(ActiveMQBuffer record) { + + } + + @Override + public Message putBooleanProperty(String key, boolean value) { + return null; + } + + @Override + public Message putByteProperty(String key, byte value) { + return null; + } + + @Override + public Message putBytesProperty(String key, byte[] value) { + return null; + } + + @Override + public Message putShortProperty(String key, short value) { + return null; + } + + @Override + public Message putCharProperty(String key, char value) { + return null; + } + + @Override + public Message putIntProperty(String key, int value) { + return null; + } + + @Override + public Message putLongProperty(String key, long value) { + return null; + } + + @Override + public Message putFloatProperty(String key, float value) { + return null; + } + + @Override + public Message putDoubleProperty(String key, double value) { + return null; + } + + @Override + public Message putBooleanProperty(SimpleString key, boolean value) { + return null; + } + + @Override + public Message putByteProperty(SimpleString key, byte value) { + return null; + } + + @Override + public Message putBytesProperty(SimpleString key, byte[] value) { + return null; + } + + @Override + public Message putShortProperty(SimpleString key, short value) { + return null; + } + + @Override + public Message putCharProperty(SimpleString key, char value) { + return null; + } + + @Override + public Message putIntProperty(SimpleString key, int value) { + return null; + } + + @Override + public Message putLongProperty(SimpleString key, long value) { + return null; + } + + @Override + public Message putFloatProperty(SimpleString key, float value) { + return null; + } + + @Override + public Message putDoubleProperty(SimpleString key, double value) { + return null; + } + + @Override + public Message putStringProperty(String key, String value) { + return null; + } + + @Override + public Message putObjectProperty(String key, Object value) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public Message putObjectProperty(SimpleString key, Object value) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public Object removeProperty(String key) { + return null; + } + + @Override + public boolean containsProperty(String key) { + return false; + } + + @Override + public Boolean getBooleanProperty(String key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public Byte getByteProperty(String key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public Double getDoubleProperty(String key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public Integer getIntProperty(String key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public Long getLongProperty(String key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public Object getObjectProperty(String key) { + return null; + } + + @Override + public Short getShortProperty(String key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public Float getFloatProperty(String key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public String getStringProperty(String key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException { + return new byte[0]; + } + + @Override + public Object removeProperty(SimpleString key) { + return null; + } + + @Override + public Boolean getBooleanProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public Byte getByteProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public Double getDoubleProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public Integer getIntProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public Object getObjectProperty(SimpleString key) { + return null; + } + + @Override + public Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public Float getFloatProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public String getStringProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public SimpleString getSimpleStringProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public byte[] getBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return new byte[0]; + } + + @Override + public Message putStringProperty(SimpleString key, SimpleString value) { + return null; + } + + @Override + public int getEncodeSize() { + return 0; + } + + @Override + public Set<SimpleString> getPropertyNames() { + return null; + } + + @Override + public int getRefCount() { + return 0; + } + + @Override + public int incrementRefCount() throws Exception { + return 0; + } + + @Override + public int decrementRefCount() throws Exception { + return 0; + } + + @Override + public int incrementDurableRefCount() { + return 0; + } + + @Override + public int decrementDurableRefCount() { + return 0; + } + + @Override + public ICoreMessage toCore() { + return null; + } + + @Override + public int getMemoryEstimate() { + return 0; + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index f471a2a..3bdee8b 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -27,15 +27,16 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.QueueQueryResult; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.ServerConsumer; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; @@ -208,7 +209,7 @@ public class AMQConsumer { } - public int handleDeliver(MessageReference reference, ServerMessage message, int deliveryCount) { + public int handleDeliver(MessageReference reference, ICoreMessage message, int deliveryCount) { MessageDispatch dispatch; try { if (messagePullHandler != null && !messagePullHandler.checkForcedConsumer(message)) { @@ -394,7 +395,7 @@ public class AMQConsumer { } } - public boolean checkForcedConsumer(ServerMessage message) { + public boolean checkForcedConsumer(Message message) { if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) { if (next >= 0) { if (timeout <= 0) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 79004ae..b5d2c86 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -23,6 +23,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.postoffice.RoutingStatus; @@ -34,9 +35,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.QueueQueryResult; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.ServerConsumer; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; import org.apache.activemq.artemis.reader.MessageUtil; @@ -231,16 +230,17 @@ public class AMQSession implements SessionCallback { @Override public int sendMessage(MessageReference reference, - ServerMessage message, + org.apache.activemq.artemis.api.core.Message message, ServerConsumer consumer, int deliveryCount) { AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData(); - return theConsumer.handleDeliver(reference, message, deliveryCount); + // TODO: use encoders and proper conversions here + return theConsumer.handleDeliver(reference, message.toCore(), deliveryCount); } @Override public int sendLargeMessage(MessageReference reference, - ServerMessage message, + org.apache.activemq.artemis.api.core.Message message, ServerConsumer consumerID, long bodySize, int deliveryCount) { @@ -296,7 +296,7 @@ public class AMQSession implements SessionCallback { actualDestinations = new ActiveMQDestination[]{destination}; } - ServerMessage originalCoreMsg = getConverter().inbound(messageSend); + org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend); if (connection.isNoLocal()) { //Note: advisory messages are dealt with in @@ -324,7 +324,7 @@ public class AMQSession implements SessionCallback { for (int i = 0; i < actualDestinations.length; i++) { ActiveMQDestination dest = actualDestinations[i]; SimpleString address = new SimpleString(dest.getPhysicalName()); - ServerMessage coreMsg = originalCoreMsg.copy(); + org.apache.activemq.artemis.api.core.Message coreMsg = originalCoreMsg.copy(); coreMsg.setAddress(address); if (actualDestinations[i].isQueue()) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java index 5355c63..c84776b 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java @@ -18,8 +18,9 @@ package org.apache.activemq.artemis.core.protocol.openwire.util; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.config.WildcardConfiguration; -import org.apache.activemq.artemis.core.server.ServerMessage; + import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; @@ -53,9 +54,14 @@ public class OpenWireUtil { * set on publish/send so a divert or wildcard may mean thats its different to the destination subscribed to by the * consumer */ - public static ActiveMQDestination toAMQAddress(ServerMessage message, ActiveMQDestination actualDestination) { - String address = message.getAddress().toString(); + public static ActiveMQDestination toAMQAddress(Message message, ActiveMQDestination actualDestination) { + String address = message.getAddress(); String strippedAddress = address;//.replace(JMS_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TEMP_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TOPIC_ADDRESS_PREFIX, "").replace(JMS_TEMP_TOPIC_ADDRESS_PREFIX, ""); + + if (address == null) { + return actualDestination; + } + if (actualDestination.isQueue()) { return new ActiveMQQueue(strippedAddress); } else { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java index 861c524..d377abd 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.core.protocol.stomp; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.jboss.logging.Messages; import org.jboss.logging.annotations.Cause; import org.jboss.logging.annotations.Message; @@ -71,7 +70,7 @@ public interface ActiveMQStompProtocolMessageBundle { ActiveMQStompException invalidConnection(); @Message(id = 339011, value = "Error sending message {0}", format = Message.Format.MESSAGE_FORMAT) - ActiveMQStompException errorSendMessage(ServerMessageImpl message, @Cause Exception e); + ActiveMQStompException errorSendMessage(org.apache.activemq.artemis.api.core.Message message, @Cause Exception e); @Message(id = 339012, value = "Error beginning a transaction {0}", format = Message.Format.MESSAGE_FORMAT) ActiveMQStompException errorBeginTx(String txID, @Cause Exception e); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index c004a0e..56067f1 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -30,18 +30,18 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10; import org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12; import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerSession; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; @@ -569,7 +569,7 @@ public final class StompConnection implements RemotingConnection { return valid; } - public ServerMessageImpl createServerMessage() { + public CoreMessage createServerMessage() { return manager.createServerMessage(); } @@ -598,7 +598,7 @@ public final class StompConnection implements RemotingConnection { } } - protected void sendServerMessage(ServerMessageImpl message, String txID) throws ActiveMQStompException { + protected void sendServerMessage(ICoreMessage message, String txID) throws ActiveMQStompException { StompSession stompSession = getSession(txID); if (stompSession.isNoLocal()) { @@ -611,7 +611,7 @@ public final class StompConnection implements RemotingConnection { if (minLargeMessageSize == -1 || (message.getBodyBuffer().writerIndex() < minLargeMessageSize)) { stompSession.sendInternal(message, false); } else { - stompSession.sendInternalLarge(message, false); + stompSession.sendInternalLarge((CoreMessage)message, false); } } catch (Exception e) { throw BUNDLE.errorSendMessage(message, e).setHandler(frameHandler); @@ -726,10 +726,11 @@ public final class StompConnection implements RemotingConnection { return SERVER_NAME; } - public StompFrame createStompMessage(ServerMessage serverMessage, + public StompFrame createStompMessage(ICoreMessage serverMessage, + ActiveMQBuffer bodyBuffer, StompSubscription subscription, int deliveryCount) throws Exception { - return frameHandler.createMessageFrame(serverMessage, subscription, deliveryCount); + return frameHandler.createMessageFrame(serverMessage, bodyBuffer, subscription, deliveryCount); } public void addStompEventListener(FrameEventListener listener) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java index 54339a4..39d2fe9 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java @@ -33,15 +33,14 @@ import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ServerSession; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; -import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; @@ -109,13 +108,6 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St } @Override - public MessageConverter getConverter() { - return null; - } - - // ProtocolManager implementation -------------------------------- - - @Override public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final Connection connection) { StompConnection conn = new StompConnection(acceptorUsed, connection, this, server.getScheduledPool(), server.getExecutorFactory()); @@ -345,8 +337,8 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St return validated; } - public ServerMessageImpl createServerMessage() { - return new ServerMessageImpl(server.getStorageManager().generateID(), 512); + public CoreMessage createServerMessage() { + return new CoreMessage(server.getStorageManager().generateID(), 512); } public void commitTransaction(StompConnection connection, String txID) throws Exception { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java index 1e103e9..797a966 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java @@ -25,23 +25,24 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingDeque; import java.util.zip.Inflater; +import io.netty.buffer.UnpooledByteBufAllocator; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.message.BodyEncoder; -import org.apache.activemq.artemis.core.message.impl.MessageImpl; +import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; +import org.apache.activemq.artemis.core.message.LargeBodyEncoder; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.ServerConsumer; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerSession; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; @@ -127,32 +128,35 @@ public class StompSession implements SessionCallback { @Override public int sendMessage(MessageReference ref, - ServerMessage serverMessage, + Message serverMessage, final ServerConsumer consumer, int deliveryCount) { + + ICoreMessage coreMessage = serverMessage.toCore(); + LargeServerMessageImpl largeMessage = null; - ServerMessage newServerMessage = serverMessage; + ICoreMessage newServerMessage = serverMessage.toCore(); try { StompSubscription subscription = subscriptions.get(consumer.getID()); - StompFrame frame = null; - if (serverMessage.isLargeMessage()) { - newServerMessage = serverMessage.copy(); + StompFrame frame; + ActiveMQBuffer buffer; - largeMessage = (LargeServerMessageImpl) serverMessage; - BodyEncoder encoder = largeMessage.getBodyEncoder(); + if (coreMessage.isLargeMessage()) { + LargeBodyEncoder encoder = coreMessage.getBodyEncoder(); encoder.open(); int bodySize = (int) encoder.getLargeBodySize(); - //large message doesn't have a body. - ((ServerMessageImpl) newServerMessage).createBody(bodySize); - encoder.encode(newServerMessage.getBodyBuffer(), bodySize); + buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize)); + + encoder.encode(buffer, bodySize); encoder.close(); + } else { + buffer = coreMessage.getReadOnlyBodyBuffer(); } if (serverMessage.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)) { - //decompress - ActiveMQBuffer qbuff = newServerMessage.getBodyBuffer(); - int bytesToRead = qbuff.writerIndex() - MessageImpl.BODY_OFFSET; + ActiveMQBuffer qbuff = buffer; + int bytesToRead = qbuff.readerIndex(); Inflater inflater = new Inflater(); inflater.setInput(ByteUtil.getActiveArray(qbuff.readBytes(bytesToRead).toByteBuffer())); @@ -165,9 +169,10 @@ public class StompSession implements SessionCallback { qbuff.resetReaderIndex(); qbuff.resetWriterIndex(); qbuff.writeBytes(data); + buffer = qbuff; } - frame = connection.createStompMessage(newServerMessage, subscription, deliveryCount); + frame = connection.createStompMessage(newServerMessage, buffer, subscription, deliveryCount); int length = frame.getEncodedSize(); @@ -219,7 +224,7 @@ public class StompSession implements SessionCallback { @Override public int sendLargeMessage(MessageReference ref, - ServerMessage msg, + Message msg, ServerConsumer consumer, long bodySize, int deliveryCount) { @@ -370,11 +375,11 @@ public class StompSession implements SessionCallback { this.noLocal = noLocal; } - public void sendInternal(ServerMessageImpl message, boolean direct) throws Exception { + public void sendInternal(Message message, boolean direct) throws Exception { session.send(message, direct); } - public void sendInternalLarge(ServerMessageImpl message, boolean direct) throws Exception { + public void sendInternalLarge(CoreMessage message, boolean direct) throws Exception { int headerSize = message.getHeadersAndPropertiesEncodeSize(); if (headerSize >= connection.getMinLargeMessageSize()) { throw BUNDLE.headerTooBig(); @@ -384,7 +389,7 @@ public class StompSession implements SessionCallback { long id = storageManager.generateID(); LargeServerMessage largeMessage = storageManager.createLargeMessage(id, message); - byte[] bytes = new byte[message.getBodyBuffer().writerIndex() - MessageImpl.BODY_OFFSET]; + byte[] bytes = new byte[message.getBodyBuffer().writerIndex() - CoreMessage.BODY_OFFSET]; message.getBodyBuffer().readBytes(bytes); largeMessage.addBytes(bytes); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java index affab84..7db9d82 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java @@ -24,8 +24,6 @@ import java.util.Set; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.reader.MessageUtil; public class StompUtils { @@ -37,7 +35,7 @@ public class StompUtils { // Static -------------------------------------------------------- - public static void copyStandardHeadersFromFrameToMessage(StompFrame frame, ServerMessageImpl msg) throws Exception { + public static void copyStandardHeadersFromFrameToMessage(StompFrame frame, Message msg) throws Exception { Map<String, String> headers = new HashMap<>(frame.getHeadersMap()); String priority = headers.remove(Stomp.Headers.Send.PRIORITY); @@ -79,7 +77,7 @@ public class StompUtils { } } - public static void copyStandardHeadersFromMessageToFrame(MessageInternal message, + public static void copyStandardHeadersFromMessageToFrame(Message message, StompFrame command, int deliveryCount) throws Exception { command.addHeader(Stomp.Headers.Message.MESSAGE_ID, String.valueOf(message.getMessageID())); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java index f91ba82..3f68c6f 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java @@ -20,17 +20,15 @@ import java.nio.charset.StandardCharsets; import java.util.concurrent.ScheduledExecutorService; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.message.impl.MessageImpl; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.protocol.stomp.Stomp.Headers; import org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10; import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11; import org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12; -import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; -import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.ExecutorFactory; import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE; @@ -180,7 +178,7 @@ public abstract class VersionedStompFrameHandler { long timestamp = System.currentTimeMillis(); - ServerMessageImpl message = connection.createServerMessage(); + CoreMessage message = connection.createServerMessage(); if (routingType != null) { message.putByteProperty(Message.HDR_ROUTING_TYPE, routingType.getType()); } @@ -289,7 +287,8 @@ public abstract class VersionedStompFrameHandler { return response; } - public StompFrame createMessageFrame(ServerMessage serverMessage, + public StompFrame createMessageFrame(ICoreMessage serverMessage, + ActiveMQBuffer bodyBuffer, StompSubscription subscription, int deliveryCount) throws Exception { StompFrame frame = createStompFrame(Stomp.Responses.MESSAGE); @@ -298,13 +297,11 @@ public abstract class VersionedStompFrameHandler { frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID()); } - ActiveMQBuffer buffer = serverMessage.getBodyBufferDuplicate(); - - int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex() : serverMessage.getEndOfBodyPosition(); + ActiveMQBuffer buffer = bodyBuffer != null ? bodyBuffer : serverMessage.getReadOnlyBodyBuffer(); - buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT); + int bodyPos = (serverMessage).getEndOfBodyPosition() == -1 ? buffer.writerIndex() : (serverMessage).getEndOfBodyPosition(); - int size = bodyPos - buffer.readerIndex(); + int size = buffer.writerIndex(); byte[] data = new byte[size]; @@ -321,7 +318,7 @@ public abstract class VersionedStompFrameHandler { } frame.setByteBody(data); - StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount); + StompUtils.copyStandardHeadersFromMessageToFrame((serverMessage), frame, deliveryCount); return frame; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java index 6b211d2..77a9225 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java @@ -18,6 +18,8 @@ package org.apache.activemq.artemis.core.protocol.stomp.v12; import java.util.concurrent.ScheduledExecutorService; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException; import org.apache.activemq.artemis.core.protocol.stomp.Stomp; import org.apache.activemq.artemis.core.protocol.stomp.StompConnection; @@ -27,7 +29,6 @@ import org.apache.activemq.artemis.core.protocol.stomp.StompSubscription; import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11; import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameV11; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.utils.ExecutorFactory; import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE; @@ -48,10 +49,11 @@ public class StompFrameHandlerV12 extends StompFrameHandlerV11 { } @Override - public StompFrame createMessageFrame(ServerMessage serverMessage, + public StompFrame createMessageFrame(ICoreMessage serverMessage, + ActiveMQBuffer bodyBuffer, StompSubscription subscription, int deliveryCount) throws Exception { - StompFrame frame = super.createMessageFrame(serverMessage, subscription, deliveryCount); + StompFrame frame = super.createMessageFrame(serverMessage, bodyBuffer, subscription, deliveryCount); if (!subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO)) { frame.addHeader(Stomp.Headers.Message.ACK, String.valueOf(serverMessage.getMessageID())); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java index 7881470..30d6668 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java @@ -750,10 +750,6 @@ public interface Configuration { Configuration setLogJournalWriteRate(boolean rate); - int getJournalPerfBlastPages(); - - Configuration setJournalPerfBlastPages(int pages); - long getServerDumpInterval(); Configuration setServerDumpInterval(long interval); @@ -766,10 +762,6 @@ public interface Configuration { Configuration setMemoryMeasureInterval(long memoryMeasureInterval); - boolean isRunSyncSpeedTest(); - - Configuration setRunSyncSpeedTest(boolean run); - // Paging Properties -------------------------------------------------------------------- /** http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java index f4eda91..329f654 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java @@ -193,10 +193,6 @@ public class ConfigurationImpl implements Configuration, Serializable { protected boolean logJournalWriteRate = ActiveMQDefaultConfiguration.isDefaultJournalLogWriteRate(); - protected int journalPerfBlastPages = ActiveMQDefaultConfiguration.getDefaultJournalPerfBlastPages(); - - protected boolean runSyncSpeedTest = ActiveMQDefaultConfiguration.isDefaultRunSyncSpeedTest(); - private WildcardConfiguration wildcardConfiguration = new WildcardConfiguration(); private boolean messageCounterEnabled = ActiveMQDefaultConfiguration.isDefaultMessageCounterEnabled(); @@ -854,28 +850,6 @@ public class ConfigurationImpl implements Configuration, Serializable { } @Override - public int getJournalPerfBlastPages() { - return journalPerfBlastPages; - } - - @Override - public ConfigurationImpl setJournalPerfBlastPages(final int journalPerfBlastPages) { - this.journalPerfBlastPages = journalPerfBlastPages; - return this; - } - - @Override - public boolean isRunSyncSpeedTest() { - return runSyncSpeedTest; - } - - @Override - public ConfigurationImpl setRunSyncSpeedTest(final boolean run) { - runSyncSpeedTest = run; - return this; - } - - @Override public boolean isCreateBindingsDir() { return createBindingsDir; } @@ -1556,7 +1530,6 @@ public class ConfigurationImpl implements Configuration, Serializable { result = prime * result + journalMaxIO_AIO; result = prime * result + journalMaxIO_NIO; result = prime * result + journalMinFiles; - result = prime * result + journalPerfBlastPages; result = prime * result + (journalSyncNonTransactional ? 1231 : 1237); result = prime * result + (journalSyncTransactional ? 1231 : 1237); result = prime * result + ((journalType == null) ? 0 : journalType.hashCode()); @@ -1580,7 +1553,6 @@ public class ConfigurationImpl implements Configuration, Serializable { result = prime * result + (persistIDCache ? 1231 : 1237); result = prime * result + (persistenceEnabled ? 1231 : 1237); result = prime * result + ((queueConfigurations == null) ? 0 : queueConfigurations.hashCode()); - result = prime * result + (runSyncSpeedTest ? 1231 : 1237); result = prime * result + scheduledThreadPoolMaxSize; result = prime * result + (securityEnabled ? 1231 : 1237); result = prime * result + (populateValidatedUser ? 1231 : 1237); @@ -1723,8 +1695,6 @@ public class ConfigurationImpl implements Configuration, Serializable { return false; if (journalMinFiles != other.journalMinFiles) return false; - if (journalPerfBlastPages != other.journalPerfBlastPages) - return false; if (journalSyncNonTransactional != other.journalSyncNonTransactional) return false; if (journalSyncTransactional != other.journalSyncTransactional) @@ -1793,8 +1763,6 @@ public class ConfigurationImpl implements Configuration, Serializable { return false; } else if (!queueConfigurations.equals(other.queueConfigurations)) return false; - if (runSyncSpeedTest != other.runSyncSpeedTest) - return false; if (scheduledThreadPoolMaxSize != other.scheduledThreadPoolMaxSize) return false; if (securityEnabled != other.securityEnabled) http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index cea0598..4055b5c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -548,10 +548,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { config.setLogJournalWriteRate(getBoolean(e, "log-journal-write-rate", ActiveMQDefaultConfiguration.isDefaultJournalLogWriteRate())); - config.setJournalPerfBlastPages(getInteger(e, "perf-blast-pages", ActiveMQDefaultConfiguration.getDefaultJournalPerfBlastPages(), Validators.MINUS_ONE_OR_GT_ZERO)); - - config.setRunSyncSpeedTest(getBoolean(e, "run-sync-speed-test", config.isRunSyncSpeedTest())); - if (e.hasAttribute("wild-card-routing-enabled")) { config.setWildcardRoutingEnabled(getBoolean(e, "wild-card-routing-enabled", config.isWildcardRoutingEnabled())); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java index 41d5e54..3737e19 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java @@ -16,8 +16,8 @@ */ package org.apache.activemq.artemis.core.filter; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.server.ServerMessage; public interface Filter { @@ -31,7 +31,7 @@ public interface Filter { */ String GENERIC_IGNORED_FILTER = "__AMQX=-1"; - boolean match(ServerMessage message); + boolean match(Message message); SimpleString getFilterString(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java index 0a459c9..33a1187 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java @@ -18,11 +18,11 @@ package org.apache.activemq.artemis.core.filter.impl; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.FilterConstants; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.selector.filter.BooleanExpression; import org.apache.activemq.artemis.selector.filter.FilterException; import org.apache.activemq.artemis.selector.filter.Filterable; @@ -103,7 +103,7 @@ public class FilterImpl implements Filter { } @Override - public synchronized boolean match(final ServerMessage message) { + public synchronized boolean match(final Message message) { try { boolean result = booleanExpression.matches(new FilterableServerMessage(message)); return result; @@ -148,7 +148,7 @@ public class FilterImpl implements Filter { // Private -------------------------------------------------------------------------- - private static Object getHeaderFieldValue(final ServerMessage msg, final SimpleString fieldName) { + private static Object getHeaderFieldValue(final Message msg, final SimpleString fieldName) { if (FilterConstants.ACTIVEMQ_USERID.equals(fieldName)) { if (msg.getUserID() == null) { // Proton stores JMSMessageID as NATIVE_MESSAGE_ID that is an arbitrary string @@ -158,7 +158,12 @@ public class FilterImpl implements Filter { } } // It's the stringified (hex) representation of a user id that can be used in a selector expression - return new SimpleString("ID:" + msg.getUserID()); + String userID = msg.getUserID().toString(); + if (userID.startsWith("ID:")) { + return SimpleString.toSimpleString(userID); + } else { + return new SimpleString("ID:" + msg.getUserID()); + } } else if (FilterConstants.ACTIVEMQ_PRIORITY.equals(fieldName)) { return Integer.valueOf(msg.getPriority()); } else if (FilterConstants.ACTIVEMQ_TIMESTAMP.equals(fieldName)) { @@ -178,9 +183,9 @@ public class FilterImpl implements Filter { private static class FilterableServerMessage implements Filterable { - private final ServerMessage message; + private final Message message; - private FilterableServerMessage(ServerMessage message) { + private FilterableServerMessage(Message message) { this.message = message; } @@ -191,7 +196,7 @@ public class FilterImpl implements Filter { result = getHeaderFieldValue(message, new SimpleString(id)); } if (result == null) { - result = message.getObjectProperty(new SimpleString(id)); + result = message.getObjectProperty(id); } if (result != null) { if (result.getClass() == SimpleString.class) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java index 09dd702..31e056c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java @@ -25,10 +25,12 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.management.AddressControl; import org.apache.activemq.artemis.api.core.management.QueueControl; import org.apache.activemq.artemis.api.core.management.ResourceNames; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -40,9 +42,7 @@ import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.security.SecurityAuth; import org.apache.activemq.artemis.core.security.SecurityStore; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -282,7 +282,7 @@ public class AddressControlImpl extends AbstractControl implements AddressContro return null; } }); - ServerMessageImpl message = new ServerMessageImpl(storageManager.generateID(), 50); + CoreMessage message = new CoreMessage(storageManager.generateID(), 50); for (String header : headers.keySet()) { message.putStringProperty(new SimpleString(header), new SimpleString(headers.get(header))); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index 4b84909..5ecea64 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -39,7 +39,7 @@ import org.apache.activemq.artemis.api.core.management.QueueControl; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.impl.FilterImpl; import org.apache.activemq.artemis.core.management.impl.openmbean.OpenTypeSupport; -import org.apache.activemq.artemis.core.message.impl.MessageImpl; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.messagecounter.MessageCounter; import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -53,8 +53,6 @@ import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -609,7 +607,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { try { Filter singleMessageFilter = new Filter() { @Override - public boolean match(ServerMessage message) { + public boolean match(Message message) { return message.getMessageID() == messageID; } @@ -738,7 +736,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { return null; } }); - ServerMessageImpl message = new ServerMessageImpl(storageManager.generateID(), 50); + CoreMessage message = new CoreMessage(storageManager.generateID(), 50); for (String header : headers.keySet()) { message.putStringProperty(new SimpleString(header), new SimpleString(headers.get(header))); } @@ -755,7 +753,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { message.setAddress(queue.getAddress()); ByteBuffer buffer = ByteBuffer.allocate(8); buffer.putLong(queue.getID()); - message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array()); + message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, buffer.array()); postOffice.route(message, true); return "" + message.getMessageID(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java index ec6848b..098c61c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java @@ -32,10 +32,10 @@ import java.util.List; import java.util.Map; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.MessageReference; -import org.apache.activemq.artemis.core.server.ServerMessage; public final class OpenTypeSupport { @@ -48,8 +48,10 @@ public final class OpenTypeSupport { public static CompositeData convert(MessageReference ref) throws OpenDataException { CompositeType ct; + ICoreMessage message = ref.getMessage().toCore(); + Map<String, Object> fields; - byte type = ref.getMessage().getType(); + byte type = message.getType(); switch(type) { case Message.TEXT_TYPE: @@ -128,7 +130,7 @@ public final class OpenTypeSupport { public Map<String, Object> getFields(MessageReference ref) throws OpenDataException { Map<String, Object> rc = new HashMap<>(); - Message m = ref.getMessage(); + ICoreMessage m = ref.getMessage().toCore(); rc.put(CompositeDataConstants.MESSAGE_ID, "" + m.getMessageID()); if (m.getUserID() != null) { rc.put(CompositeDataConstants.USER_ID, "ID:" + m.getUserID().toString()); @@ -143,6 +145,11 @@ public final class OpenTypeSupport { rc.put(CompositeDataConstants.PRIORITY, m.getPriority()); rc.put(CompositeDataConstants.REDELIVERED, ref.getDeliveryCount() > 1); + ActiveMQBuffer bodyCopy = m.getReadOnlyBodyBuffer(); + byte[] bytes = new byte[bodyCopy.readableBytes()]; + bodyCopy.readBytes(bytes); + rc.put(CompositeDataConstants.BODY, bytes); + Map<String, Object> propertyMap = m.toPropertyMap(); rc.put(CompositeDataConstants.PROPERTIES, "" + propertyMap); @@ -264,8 +271,8 @@ public final class OpenTypeSupport { @Override public Map<String, Object> getFields(MessageReference ref) throws OpenDataException { Map<String, Object> rc = super.getFields(ref); - ServerMessage m = ref.getMessage(); - ActiveMQBuffer bodyCopy = m.getBodyBufferDuplicate(); + ICoreMessage m = ref.getMessage().toCore(); + ActiveMQBuffer bodyCopy = m.getReadOnlyBodyBuffer(); byte[] bytes = new byte[bodyCopy.readableBytes()]; bodyCopy.readBytes(bytes); rc.put(CompositeDataConstants.BODY, bytes); @@ -285,8 +292,8 @@ public final class OpenTypeSupport { @Override public Map<String, Object> getFields(MessageReference ref) throws OpenDataException { Map<String, Object> rc = super.getFields(ref); - ServerMessage m = ref.getMessage(); - SimpleString text = m.getBodyBuffer().copy().readNullableSimpleString(); + ICoreMessage m = ref.getMessage().toCore(); + SimpleString text = m.getReadOnlyBodyBuffer().readNullableSimpleString(); rc.put(CompositeDataConstants.TEXT_BODY, text != null ? text.toString() : ""); return rc; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java index 9b1e243..0124f09 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java @@ -16,9 +16,9 @@ */ package org.apache.activemq.artemis.core.paging; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.persistence.StorageManager; -import org.apache.activemq.artemis.core.server.ServerMessage; /** * A Paged message. @@ -28,7 +28,7 @@ import org.apache.activemq.artemis.core.server.ServerMessage; */ public interface PagedMessage extends EncodingSupport { - ServerMessage getMessage(); + Message getMessage(); /** * The queues that were routed during paging http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java index 5ead1a2..2d4c646 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java @@ -20,13 +20,14 @@ import java.io.File; import java.util.Collection; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RefCountMessageListener; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider; import org.apache.activemq.artemis.core.paging.impl.Page; import org.apache.activemq.artemis.core.replication.ReplicationManager; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.RouteContextList; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -41,7 +42,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction; * * @see PagingManager */ -public interface PagingStore extends ActiveMQComponent { +public interface PagingStore extends ActiveMQComponent, RefCountMessageListener { SimpleString getAddress(); @@ -90,7 +91,7 @@ public interface PagingStore extends ActiveMQComponent { * needs to be sent to the journal * @throws NullPointerException if {@code readLock} is null */ - boolean page(ServerMessage message, Transaction tx, RouteContextList listCtx, ReadLock readLock) throws Exception; + boolean page(Message message, Transaction tx, RouteContextList listCtx, ReadLock readLock) throws Exception; Page createPage(final int page) throws Exception; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java index 768b43f..823eef4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java @@ -20,11 +20,11 @@ import java.lang.ref.WeakReference; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.Message; + import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.transaction.Transaction; import org.jboss.logging.Logger; @@ -41,7 +41,7 @@ public class PagedReferenceImpl implements PagedReference { private int persistedCount; - private int messageEstimate; + private int messageEstimate = -1; private Long consumerId; @@ -64,7 +64,7 @@ public class PagedReferenceImpl implements PagedReference { } @Override - public ServerMessage getMessage() { + public Message getMessage() { return getPagedMessage().getMessage(); } @@ -93,12 +93,6 @@ public class PagedReferenceImpl implements PagedReference { final PagedMessage message, final PageSubscription subscription) { this.position = position; - - if (message == null) { - this.messageEstimate = -1; - } else { - this.messageEstimate = message.getMessage().getMemoryEstimate(); - } this.message = new WeakReference<>(message); this.subscription = subscription; } @@ -120,7 +114,7 @@ public class PagedReferenceImpl implements PagedReference { @Override public int getMessageMemoryEstimate() { - if (messageEstimate < 0) { + if (messageEstimate <= 0) { try { messageEstimate = getMessage().getMemoryEstimate(); } catch (Throwable e) { @@ -139,7 +133,7 @@ public class PagedReferenceImpl implements PagedReference { public long getScheduledDeliveryTime() { if (deliveryTime == null) { try { - ServerMessage msg = getMessage(); + Message msg = getMessage(); if (msg.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) { deliveryTime = getMessage().getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME); } else { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java index c40d20d..ab10eb4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java @@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.paging.PageTransactionInfo; @@ -50,7 +51,6 @@ import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; @@ -772,7 +772,7 @@ final class PageSubscriptionImpl implements PageSubscription { // Protected ----------------------------------------------------- - private boolean match(final ServerMessage message) { + private boolean match(final Message message) { if (filter == null) { return true; } else { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java index 4993d0c..7d21316 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; @@ -132,7 +133,7 @@ public final class Page implements Comparable<Page> { int messageSize = fileBuffer.readInt(); int oldPos = fileBuffer.readerIndex(); if (fileBuffer.readerIndex() + messageSize < fileBuffer.capacity() && fileBuffer.getByte(oldPos + messageSize) == Page.END_BYTE) { - PagedMessage msg = new PagedMessageImpl(); + PagedMessage msg = new PagedMessageImpl(storageManager); msg.decode(fileBuffer); byte b = fileBuffer.readByte(); if (b != Page.END_BYTE) { @@ -255,7 +256,7 @@ public final class Page implements Comparable<Page> { if (messages != null) { for (PagedMessage msg : messages) { - if (msg.getMessage().isLargeMessage()) { + if (msg.getMessage() instanceof ICoreMessage && (msg.getMessage()).isLargeMessage()) { LargeServerMessage lmsg = (LargeServerMessage) msg.getMessage(); // Remember, cannot call delete directly here http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java index e40d107..b770623 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java @@ -20,11 +20,13 @@ import java.util.Arrays; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessagePersister; import org.apache.activemq.artemis.core.server.LargeServerMessage; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; +import org.apache.activemq.artemis.spi.core.protocol.MessagePersister; import org.apache.activemq.artemis.utils.DataConstants; /** @@ -38,27 +40,30 @@ public class PagedMessageImpl implements PagedMessage { */ private byte[] largeMessageLazyData; - private ServerMessage message; + private Message message; private long[] queueIDs; private long transactionID = 0; - public PagedMessageImpl(final ServerMessage message, final long[] queueIDs, final long transactionID) { + private volatile StorageManager storageManager; + + public PagedMessageImpl(final Message message, final long[] queueIDs, final long transactionID) { this(message, queueIDs); this.transactionID = transactionID; } - public PagedMessageImpl(final ServerMessage message, final long[] queueIDs) { + public PagedMessageImpl(final Message message, final long[] queueIDs) { this.queueIDs = queueIDs; this.message = message; } - public PagedMessageImpl() { + public PagedMessageImpl(StorageManager storageManager) { + this.storageManager = storageManager; } @Override - public ServerMessage getMessage() { + public Message getMessage() { return message; } @@ -66,11 +71,11 @@ public class PagedMessageImpl implements PagedMessage { public void initMessage(StorageManager storage) { if (largeMessageLazyData != null) { LargeServerMessage lgMessage = storage.createLargeMessage(); - ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(largeMessageLazyData); - lgMessage.decodeHeadersAndProperties(buffer); - lgMessage.incrementDelayDeletionCount(); - lgMessage.setPaged(); - message = lgMessage; + + ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(largeMessageLazyData); + LargeMessagePersister.getInstance().decode(buffer, lgMessage); + ((LargeServerMessage) message).incrementDelayDeletionCount(); + this.message = lgMessage; largeMessageLazyData = null; } } @@ -96,15 +101,16 @@ public class PagedMessageImpl implements PagedMessage { if (isLargeMessage) { int largeMessageHeaderSize = buffer.readInt(); - largeMessageLazyData = new byte[largeMessageHeaderSize]; - - buffer.readBytes(largeMessageLazyData); + if (storageManager == null) { + largeMessageLazyData = new byte[largeMessageHeaderSize]; + buffer.readBytes(largeMessageLazyData); + } else { + this.message = storageManager.createLargeMessage(); + LargeMessagePersister.getInstance().decode(buffer, (LargeServerMessage) message); + ((LargeServerMessage) message).incrementDelayDeletionCount(); + } } else { - buffer.readInt(); // This value is only used on LargeMessages for now - - message = new ServerMessageImpl(-1, 50); - - message.decode(buffer); + this.message = MessagePersister.getInstance().decode(buffer, null); } int queueIDsSize = buffer.readInt(); @@ -120,11 +126,16 @@ public class PagedMessageImpl implements PagedMessage { public void encode(final ActiveMQBuffer buffer) { buffer.writeLong(transactionID); - buffer.writeBoolean(message instanceof LargeServerMessage); + boolean isLargeMessage = isLargeMessage(); - buffer.writeInt(message.getEncodeSize()); + buffer.writeBoolean(isLargeMessage); - message.encode(buffer); + if (isLargeMessage) { + buffer.writeInt(LargeMessagePersister.getInstance().getEncodeSize((LargeServerMessage)message)); + LargeMessagePersister.getInstance().encode(buffer, (LargeServerMessage) message); + } else { + message.getPersister().encode(buffer, message); + } buffer.writeInt(queueIDs.length); @@ -133,10 +144,19 @@ public class PagedMessageImpl implements PagedMessage { } } + public boolean isLargeMessage() { + return message instanceof ICoreMessage && ((ICoreMessage)message).isLargeMessage(); + } + @Override public int getEncodeSize() { - return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + message.getEncodeSize() + - DataConstants.SIZE_INT + queueIDs.length * DataConstants.SIZE_LONG; + if (isLargeMessage()) { + return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + LargeMessagePersister.getInstance().getEncodeSize((LargeServerMessage)message) + + DataConstants.SIZE_INT + queueIDs.length * DataConstants.SIZE_LONG; + } else { + return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + message.getPersister().getEncodeSize(message) + + DataConstants.SIZE_INT + queueIDs.length * DataConstants.SIZE_LONG; + } } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 4e57c85..a8e2190 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -36,6 +36,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; @@ -54,7 +55,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.RouteContextList; -import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -699,7 +700,6 @@ public class PagingStoreImpl implements PagingStore { @Override public void addSize(final int size) { - boolean globalFull = pagingManager.addSize(size).isGlobalFull(); long newSize = sizeInBytes.addAndGet(size); @@ -747,7 +747,7 @@ public class PagingStoreImpl implements PagingStore { } @Override - public boolean page(ServerMessage message, + public boolean page(Message message, final Transaction tx, RouteContextList listCtx, final ReadLock managerLock) throws Exception { @@ -806,11 +806,7 @@ public class PagingStoreImpl implements PagingStore { return false; } - if (!message.isDurable()) { - // The address should never be transient when paging (even for non-persistent messages when paging) - // This will force everything to be persisted - message.forceAddress(address); - } + message.setAddress(address); final long transactionID = tx == null ? -1 : tx.getID(); PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), transactionID); @@ -920,6 +916,40 @@ public class PagingStoreImpl implements PagingStore { } + @Override + public void durableDown(Message message, int durableCount) { + } + + @Override + public void durableUp(Message message, int durableCount) { + } + + @Override + public void nonDurableUp(Message message, int count) { + if (count == 1) { + this.addSize(message.getMemoryEstimate() + MessageReferenceImpl.getMemoryEstimate()); + } else { + this.addSize(MessageReferenceImpl.getMemoryEstimate()); + } + } + + @Override + public void nonDurableDown(Message message, int count) { + if (count < 0) { + // this could happen on paged messages since they are not routed and incrementRefCount is never called + return; + } + + if (count == 0) { + this.addSize(-message.getMemoryEstimate() - MessageReferenceImpl.getMemoryEstimate()); + + } else { + this.addSize(-MessageReferenceImpl.getMemoryEstimate()); + } + + + } + private void installPageTransaction(final Transaction tx, final RouteContextList listCtx) throws Exception { FinishPageMessageOperation pgOper = (FinishPageMessageOperation) tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION); if (pgOper == null) {
