http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java index 896a8ed..6e28c0e 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java @@ -372,7 +372,7 @@ public class ActiveMQMessage implements javax.jms.Message { public void setJMSReplyTo(final Destination dest) throws JMSException { if (dest == null) { - MessageUtil.setJMSReplyTo(message, null); + MessageUtil.setJMSReplyTo(message, (String) null); replyTo = null; } else { if (dest instanceof ActiveMQDestination == false) { @@ -391,7 +391,7 @@ public class ActiveMQMessage implements javax.jms.Message { } ActiveMQDestination jbd = (ActiveMQDestination) dest; - MessageUtil.setJMSReplyTo(message, SimpleString.toSimpleString(prefix + jbd.getAddress())); + MessageUtil.setJMSReplyTo(message, prefix + jbd.getAddress()); replyTo = jbd; } @@ -401,14 +401,15 @@ public class ActiveMQMessage implements javax.jms.Message { public Destination getJMSDestination() throws JMSException { if (dest == null) { SimpleString address = message.getAddressSimpleString(); - String prefix = ""; - if (RoutingType.ANYCAST.equals(message.getRoutingType())) { - prefix = QUEUE_QUALIFIED_PREFIX; + if (address == null) { + dest = null; + } else if (RoutingType.ANYCAST.equals(message.getRoutingType())) { + dest = ActiveMQDestination.createQueue(address); } else if (RoutingType.MULTICAST.equals(message.getRoutingType())) { - prefix = TOPIC_QUALIFIED_PREFIX; + dest = ActiveMQDestination.createTopic(address); + } else { + dest = ActiveMQDestination.fromPrefixedName(address.toString()); } - - dest = address == null ? null : ActiveMQDestination.fromPrefixedName(prefix + address.toString()); } return dest; @@ -513,7 +514,7 @@ public class ActiveMQMessage implements javax.jms.Message { @Override public boolean getBooleanProperty(final String name) throws JMSException { try { - return message.getBooleanProperty(new SimpleString(name)); + return message.getBooleanProperty(name); } catch (ActiveMQPropertyConversionException e) { throw new MessageFormatException(e.getMessage()); } @@ -522,7 +523,7 @@ public class ActiveMQMessage implements javax.jms.Message { @Override public byte getByteProperty(final String name) throws JMSException { try { - return message.getByteProperty(new SimpleString(name)); + return message.getByteProperty(name); } catch (ActiveMQPropertyConversionException e) { throw new MessageFormatException(e.getMessage()); } @@ -531,7 +532,7 @@ public class ActiveMQMessage implements javax.jms.Message { @Override public short getShortProperty(final String name) throws JMSException { try { - return message.getShortProperty(new SimpleString(name)); + return message.getShortProperty(name); } catch (ActiveMQPropertyConversionException e) { throw new MessageFormatException(e.getMessage()); } @@ -544,7 +545,7 @@ public class ActiveMQMessage implements javax.jms.Message { } try { - return message.getIntProperty(new SimpleString(name)); + return message.getIntProperty(name); } catch (ActiveMQPropertyConversionException e) { throw new MessageFormatException(e.getMessage()); } @@ -557,7 +558,7 @@ public class ActiveMQMessage implements javax.jms.Message { } try { - return message.getLongProperty(new SimpleString(name)); + return message.getLongProperty(name); } catch (ActiveMQPropertyConversionException e) { throw new MessageFormatException(e.getMessage()); } @@ -566,7 +567,7 @@ public class ActiveMQMessage implements javax.jms.Message { @Override public float getFloatProperty(final String name) throws JMSException { try { - return message.getFloatProperty(new SimpleString(name)); + return message.getFloatProperty(name); } catch (ActiveMQPropertyConversionException e) { throw new MessageFormatException(e.getMessage()); } @@ -575,7 +576,7 @@ public class ActiveMQMessage implements javax.jms.Message { @Override public double getDoubleProperty(final String name) throws JMSException { try { - return message.getDoubleProperty(new SimpleString(name)); + return message.getDoubleProperty(name); } catch (ActiveMQPropertyConversionException e) { throw new MessageFormatException(e.getMessage()); } @@ -593,7 +594,7 @@ public class ActiveMQMessage implements javax.jms.Message { } else if (MessageUtil.JMSXUSERID.equals(name)) { return message.getValidatedUserID(); } else { - return message.getStringProperty(new SimpleString(name)); + return message.getStringProperty(name); } } catch (ActiveMQPropertyConversionException e) { throw new MessageFormatException(e.getMessage()); @@ -608,7 +609,7 @@ public class ActiveMQMessage implements javax.jms.Message { Object val = message.getObjectProperty(name); if (val instanceof SimpleString) { - val = ((SimpleString) val).toString(); + val = val.toString(); } return val; } @@ -622,43 +623,43 @@ public class ActiveMQMessage implements javax.jms.Message { public void setBooleanProperty(final String name, final boolean value) throws JMSException { checkProperty(name); - message.putBooleanProperty(new SimpleString(name), value); + message.putBooleanProperty(name, value); } @Override public void setByteProperty(final String name, final byte value) throws JMSException { checkProperty(name); - message.putByteProperty(new SimpleString(name), value); + message.putByteProperty(name, value); } @Override public void setShortProperty(final String name, final short value) throws JMSException { checkProperty(name); - message.putShortProperty(new SimpleString(name), value); + message.putShortProperty(name, value); } @Override public void setIntProperty(final String name, final int value) throws JMSException { checkProperty(name); - message.putIntProperty(new SimpleString(name), value); + message.putIntProperty(name, value); } @Override public void setLongProperty(final String name, final long value) throws JMSException { checkProperty(name); - message.putLongProperty(new SimpleString(name), value); + message.putLongProperty(name, value); } @Override public void setFloatProperty(final String name, final float value) throws JMSException { checkProperty(name); - message.putFloatProperty(new SimpleString(name), value); + message.putFloatProperty(name, value); } @Override public void setDoubleProperty(final String name, final double value) throws JMSException { checkProperty(name); - message.putDoubleProperty(new SimpleString(name), value); + message.putDoubleProperty(name, value); } @Override @@ -670,7 +671,7 @@ public class ActiveMQMessage implements javax.jms.Message { } else if (handleCoreProperty(name, value, MessageUtil.JMSXUSERID, org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER)) { return; } else { - message.putStringProperty(new SimpleString(name), SimpleString.toSimpleString(value)); + message.putStringProperty(name, value); } } @@ -703,7 +704,7 @@ public class ActiveMQMessage implements javax.jms.Message { } try { - message.putObjectProperty(new SimpleString(name), value); + message.putObjectProperty(name, value); } catch (ActiveMQPropertyConversionException e) { throw new MessageFormatException(e.getMessage()); } @@ -964,7 +965,7 @@ public class ActiveMQMessage implements javax.jms.Message { boolean result = false; if (jmsPropertyName.equals(name)) { - message.putStringProperty(corePropertyName, SimpleString.toSimpleString(value.toString())); + message.putStringProperty(corePropertyName, value.toString()); result = true; }
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java index 2deefa9..ff4ee0f 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java @@ -18,6 +18,8 @@ package org.apache.activemq.artemis.jms.client; import javax.jms.Queue; +import org.apache.activemq.artemis.api.core.SimpleString; + /** * ActiveMQ Artemis implementation of a JMS Queue. * <br> @@ -34,13 +36,17 @@ public class ActiveMQQueue extends ActiveMQDestination implements Queue { // Constructors -------------------------------------------------- public ActiveMQQueue() { - this(null); + this((SimpleString) null); } public ActiveMQQueue(final String address) { super(address, TYPE.QUEUE, null); } + public ActiveMQQueue(final SimpleString address) { + super(address, TYPE.QUEUE, null); + } + public ActiveMQQueue(final String address, boolean temporary) { super(address, temporary ? TYPE.TEMP_QUEUE : TYPE.QUEUE, null); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java index 374a985..cf2ec59 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java @@ -627,7 +627,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic"); } - queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(durability == ConsumerDurability.DURABLE, connection.getClientID(), subscriptionName)); + queueName = ActiveMQDestination.createQueueNameForSubscription(durability == ConsumerDurability.DURABLE, connection.getClientID(), subscriptionName); if (durability == ConsumerDurability.DURABLE) { try { @@ -750,7 +750,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic"); } - queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), subscriptionName)); + queueName = ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), subscriptionName); QueueQuery subResponse = session.queueQuery(queueName); @@ -918,7 +918,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { throw new IllegalStateException("Cannot unsubscribe using a QueueSession"); } - SimpleString queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), name)); + SimpleString queueName = ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), name); try { QueueQuery response = session.queueQuery(queueName); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java index 2762a9c..1c70c5b 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java @@ -73,7 +73,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre // For testing only public ActiveMQStreamMessage() { - message = new ClientMessageImpl((byte) 0, false, 0, 0, (byte) 4, 1500); + message = new ClientMessageImpl((byte) 0, false, 0, 0, (byte) 4, 1500, null); } // Public -------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java index e22e67b..4dbefec 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java @@ -18,6 +18,8 @@ package org.apache.activemq.artemis.jms.client; import javax.jms.Topic; +import org.apache.activemq.artemis.api.core.SimpleString; + /** * ActiveMQ Artemis implementation of a JMS Topic. * <br> @@ -33,13 +35,17 @@ public class ActiveMQTopic extends ActiveMQDestination implements Topic { // Constructors -------------------------------------------------- public ActiveMQTopic() { - this(null); + this((SimpleString) null); } public ActiveMQTopic(final String address) { this(address, false); } + public ActiveMQTopic(final SimpleString address) { + super(address, TYPE.TOPIC, null); + } + public ActiveMQTopic(final String address, boolean temporary) { super(address, TYPE.TOPIC, null); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 2bdd88a..cdab412 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -33,6 +33,7 @@ import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.RefCountMessage; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter; import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageIdHelper; @@ -70,7 +71,7 @@ public class AMQPMessage extends RefCountMessage { boolean bufferValid; Boolean durable; long messageID; - String address; + SimpleString address; MessageImpl protonMessage; private volatile int memoryEstimate = -1; private long expiration = 0; @@ -90,6 +91,7 @@ public class AMQPMessage extends RefCountMessage { private ApplicationProperties applicationProperties; private long scheduledTime = -1; private String connectionID; + private final CoreMessageObjectPools coreMessageObjectPools; Set<Object> rejectedConsumers; @@ -98,9 +100,14 @@ public class AMQPMessage extends RefCountMessage { private volatile TypedProperties extraProperties; public AMQPMessage(long messageFormat, byte[] data) { + this(messageFormat, data, null); + } + + public AMQPMessage(long messageFormat, byte[] data, CoreMessageObjectPools coreMessageObjectPools) { this.data = Unpooled.wrappedBuffer(data); this.messageFormat = messageFormat; this.bufferValid = true; + this.coreMessageObjectPools = coreMessageObjectPools; parseHeaders(); } @@ -108,12 +115,14 @@ public class AMQPMessage extends RefCountMessage { public AMQPMessage(long messageFormat) { this.messageFormat = messageFormat; this.bufferValid = false; + this.coreMessageObjectPools = null; } public AMQPMessage(long messageFormat, Message message) { this.messageFormat = messageFormat; this.protonMessage = (MessageImpl) message; this.bufferValid = false; + this.coreMessageObjectPools = null; } public AMQPMessage(Message message) { @@ -301,7 +310,7 @@ public class AMQPMessage extends RefCountMessage { parseHeaders(); if (_properties != null && _properties.getGroupId() != null) { - return SimpleString.toSimpleString(_properties.getGroupId()); + return SimpleString.toSimpleString(_properties.getGroupId(), coreMessageObjectPools == null ? null : coreMessageObjectPools.getGroupIdStringSimpleStringPool()); } else { return null; } @@ -588,36 +597,33 @@ public class AMQPMessage extends RefCountMessage { @Override public String getAddress() { - if (address == null) { - Properties properties = getProtonMessage().getProperties(); - if (properties != null) { - return properties.getTo(); - } else { - return null; - } - } else { - return address; - } + SimpleString addressSimpleString = getAddressSimpleString(); + return addressSimpleString == null ? null : addressSimpleString.toString(); } @Override public AMQPMessage setAddress(String address) { - this.address = address; + this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool()); return this; } @Override public AMQPMessage setAddress(SimpleString address) { - if (address != null) { - return setAddress(address.toString()); - } else { - return setAddress((String) null); - } + this.address = address; + return this; } @Override public SimpleString getAddressSimpleString() { - return SimpleString.toSimpleString(getAddress()); + if (address == null) { + Properties properties = getProtonMessage().getProperties(); + if (properties != null) { + setAddress(properties.getTo()); + } else { + return null; + } + } + return address; } @Override @@ -977,7 +983,7 @@ public class AMQPMessage extends RefCountMessage { if (applicationProperties != null) getProtonMessage().setApplicationProperties(applicationProperties); if (_properties != null) { if (address != null) { - _properties.setTo(address); + _properties.setTo(address.toString()); } getProtonMessage().setProperties(this._properties); } @@ -987,7 +993,7 @@ public class AMQPMessage extends RefCountMessage { @Override public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException { - return SimpleString.toSimpleString((String) getApplicationPropertiesMap().get(key)); + return SimpleString.toSimpleString((String) getApplicationPropertiesMap().get(key), getPropertyValuesPool()); } @Override @@ -1066,10 +1072,15 @@ public class AMQPMessage extends RefCountMessage { } @Override + public org.apache.activemq.artemis.api.core.Message putStringProperty(SimpleString key, String value) { + return putStringProperty(key.toString(), value); + } + + @Override public Set<SimpleString> getPropertyNames() { HashSet<SimpleString> values = new HashSet<>(); for (Object k : getApplicationPropertiesMap().keySet()) { - values.add(SimpleString.toSimpleString(k.toString())); + values.add(SimpleString.toSimpleString(k.toString(), getPropertyKeysPool())); } return values; } @@ -1084,17 +1095,22 @@ public class AMQPMessage extends RefCountMessage { } @Override - public ICoreMessage toCore() { + public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) { try { - return AMQPConverter.getInstance().toCore(this); + return AMQPConverter.getInstance().toCore(this, coreMessageObjectPools); } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } } @Override + public ICoreMessage toCore() { + return toCore(null); + } + + @Override public SimpleString getLastValueProperty() { - return getSimpleStringProperty(HDR_LAST_VALUE_NAME.toString()); + return getSimpleStringProperty(HDR_LAST_VALUE_NAME); } @Override @@ -1155,4 +1171,12 @@ public class AMQPMessage extends RefCountMessage { ", address=" + getAddress() + "]"; } + + private SimpleString.StringSimpleStringPool getPropertyKeysPool() { + return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyKeysPool(); + } + + private SimpleString.StringSimpleStringPool getPropertyValuesPool() { + return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyValuesPool(); + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 19348f4..7134d3b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -101,6 +102,7 @@ public class AMQPSessionCallback implements SessionCallback { private final AtomicBoolean draining = new AtomicBoolean(false); + private CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools(); private final AddressQueryCache<AddressQueryResult> addressQueryCache = new AddressQueryCache<>(); @@ -210,14 +212,14 @@ public class AMQPSessionCallback implements SessionCallback { } public Object createSender(ProtonServerSenderContext protonSender, - String queue, + SimpleString queue, String filter, boolean browserOnly) throws Exception { long consumerID = consumerIDGenerator.generateID(); filter = SelectorTranslator.convertToActiveMQFilterString(filter); - ServerConsumer consumer = serverSession.createConsumer(consumerID, SimpleString.toSimpleString(queue), SimpleString.toSimpleString(filter), browserOnly, false, null); + ServerConsumer consumer = serverSession.createConsumer(consumerID, queue, SimpleString.toSimpleString(filter), browserOnly, false, null); // AMQP handles its own flow control for when it's started consumer.setStarted(true); @@ -233,48 +235,48 @@ public class AMQPSessionCallback implements SessionCallback { serverConsumer.receiveCredits(-1); } - public void createTemporaryQueue(String queueName, RoutingType routingType) throws Exception { - serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), routingType, null, true, false); + public void createTemporaryQueue(SimpleString queueName, RoutingType routingType) throws Exception { + serverSession.createQueue(queueName, queueName, routingType, null, true, false); } - public void createTemporaryQueue(String address, - String queueName, + public void createTemporaryQueue(SimpleString address, + SimpleString queueName, RoutingType routingType, - String filter) throws Exception { - serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), true, false); + SimpleString filter) throws Exception { + serverSession.createQueue(address, queueName, routingType, filter, true, false); } - public void createUnsharedDurableQueue(String address, + public void createUnsharedDurableQueue(SimpleString address, RoutingType routingType, - String queueName, - String filter) throws Exception { - serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, 1, false, false); + SimpleString queueName, + SimpleString filter) throws Exception { + serverSession.createQueue(address, queueName, routingType, filter, false, true, 1, false, false); } - public void createSharedDurableQueue(String address, + public void createSharedDurableQueue(SimpleString address, RoutingType routingType, - String queueName, - String filter) throws Exception { - serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, -1, false, false); + SimpleString queueName, + SimpleString filter) throws Exception { + serverSession.createQueue(address, queueName, routingType, filter, false, true, -1, false, false); } - public void createSharedVolatileQueue(String address, + public void createSharedVolatileQueue(SimpleString address, RoutingType routingType, - String queueName, - String filter) throws Exception { - serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, false, -1, true, true); + SimpleString queueName, + SimpleString filter) throws Exception { + serverSession.createQueue(address, queueName, routingType, filter, false, false, -1, true, true); } - public QueueQueryResult queueQuery(String queueName, RoutingType routingType, boolean autoCreate) throws Exception { - QueueQueryResult queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName)); + public QueueQueryResult queueQuery(SimpleString queueName, RoutingType routingType, boolean autoCreate) throws Exception { + QueueQueryResult queueQueryResult = serverSession.executeQueueQuery(queueName); if (!queueQueryResult.isExists() && queueQueryResult.isAutoCreateQueues() && autoCreate) { try { - serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), routingType, null, false, true, true); + serverSession.createQueue(queueName, queueName, routingType, null, false, true, true); } catch (ActiveMQQueueExistsException e) { // The queue may have been created by another thread in the mean time. Catch and do nothing. } - queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName)); + queueQueryResult = serverSession.executeQueueQuery(queueName); } // if auto-create we will return whatever type was used before @@ -287,32 +289,31 @@ public class AMQPSessionCallback implements SessionCallback { - public boolean bindingQuery(String address, RoutingType routingType) throws Exception { + public boolean bindingQuery(SimpleString address, RoutingType routingType) throws Exception { BindingQueryResult bindingQueryResult = bindingQueryCache.getResult(address); if (bindingQueryResult != null) { return bindingQueryResult.isExists(); } - SimpleString simpleAddress = SimpleString.toSimpleString(address); - bindingQueryResult = serverSession.executeBindingQuery(simpleAddress); + bindingQueryResult = serverSession.executeBindingQuery(address); if (routingType == RoutingType.MULTICAST && !bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateAddresses()) { try { - serverSession.createAddress(simpleAddress, routingType, true); + serverSession.createAddress(address, routingType, true); } catch (ActiveMQAddressExistsException e) { // The address may have been created by another thread in the mean time. Catch and do nothing. } - bindingQueryResult = serverSession.executeBindingQuery(simpleAddress); + bindingQueryResult = serverSession.executeBindingQuery(address); } else if (routingType == RoutingType.ANYCAST && bindingQueryResult.isAutoCreateQueues()) { - QueueQueryResult queueBinding = serverSession.executeQueueQuery(simpleAddress); + QueueQueryResult queueBinding = serverSession.executeQueueQuery(address); if (!queueBinding.isExists()) { try { - serverSession.createQueue(simpleAddress, simpleAddress, routingType, null, false, true, true); + serverSession.createQueue(address, address, routingType, null, false, true, true); } catch (ActiveMQQueueExistsException e) { // The queue may have been created by another thread in the mean time. Catch and do nothing. } } - bindingQueryResult = serverSession.executeBindingQuery(simpleAddress); + bindingQueryResult = serverSession.executeBindingQuery(address); } bindingQueryCache.setResult(address, bindingQueryResult); @@ -320,7 +321,7 @@ public class AMQPSessionCallback implements SessionCallback { } - public AddressQueryResult addressQuery(String addressName, + public AddressQueryResult addressQuery(SimpleString addressName, RoutingType routingType, boolean autoCreate) throws Exception { @@ -329,15 +330,15 @@ public class AMQPSessionCallback implements SessionCallback { return addressQueryResult; } - addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName)); + addressQueryResult = serverSession.executeAddressQuery(addressName); if (!addressQueryResult.isExists() && addressQueryResult.isAutoCreateAddresses() && autoCreate) { try { - serverSession.createAddress(SimpleString.toSimpleString(addressName), routingType, true); + serverSession.createAddress(addressName, routingType, true); } catch (ActiveMQQueueExistsException e) { // The queue may have been created by another thread in the mean time. Catch and do nothing. } - addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName)); + addressQueryResult = serverSession.executeAddressQuery(addressName); } addressQueryCache.setResult(addressName, addressQueryResult); @@ -438,15 +439,15 @@ public class AMQPSessionCallback implements SessionCallback { final Transaction transaction, final Receiver receiver, final Delivery delivery, - String address, + SimpleString address, int messageFormat, byte[] data) throws Exception { - AMQPMessage message = new AMQPMessage(messageFormat, data); + AMQPMessage message = new AMQPMessage(messageFormat, data, coreMessageObjectPools); if (address != null) { - message.setAddress(new SimpleString(address)); + message.setAddress(address); } else { // Anonymous relay must set a To value - address = message.getAddress(); + address = message.getAddressSimpleString(); if (address == null) { rejectMessage(delivery, Symbol.valueOf("failed"), "Missing 'to' field for message sent to an anonymous producer"); return; @@ -552,7 +553,7 @@ public class AMQPSessionCallback implements SessionCallback { }); } - public void offerProducerCredit(final String address, + public void offerProducerCredit(final SimpleString address, final int credits, final int threshold, final Receiver receiver) { @@ -567,7 +568,7 @@ public class AMQPSessionCallback implements SessionCallback { connection.flush(); return; } - final PagingStore store = manager.getServer().getPagingManager().getPageStore(new SimpleString(address)); + final PagingStore store = manager.getServer().getPagingManager().getPageStore(address); store.checkMemory(new Runnable() { @Override public void run() { @@ -587,8 +588,8 @@ public class AMQPSessionCallback implements SessionCallback { } } - public void deleteQueue(String queueName) throws Exception { - manager.getServer().destroyQueue(new SimpleString(queueName)); + public void deleteQueue(SimpleString queueName) throws Exception { + manager.getServer().destroyQueue(queueName); } public void resetContext(OperationContext oldContext) { @@ -657,7 +658,7 @@ public class AMQPSessionCallback implements SessionCallback { } @Override - public void disconnect(ServerConsumer consumer, String queueName) { + public void disconnect(ServerConsumer consumer, SimpleString queueName) { ErrorCondition ec = new ErrorCondition(AmqpSupport.RESOURCE_DELETED, "Queue was deleted: " + queueName); connection.lock(); try { @@ -703,12 +704,12 @@ public class AMQPSessionCallback implements SessionCallback { return serverSession.getAddress(address); } - public void removeTemporaryQueue(String address) throws Exception { - serverSession.deleteQueue(SimpleString.toSimpleString(address)); + public void removeTemporaryQueue(SimpleString address) throws Exception { + serverSession.deleteQueue(address); } - public RoutingType getDefaultRoutingType(String address) { - return manager.getServer().getAddressSettingsRepository().getMatch(address).getDefaultAddressRoutingType(); + public RoutingType getDefaultRoutingType(SimpleString address) { + return manager.getServer().getAddressSettingsRepository().getMatch(address.toString()).getDefaultAddressRoutingType(); } public void check(SimpleString address, CheckType checkType, SecurityAuth session) throws Exception { @@ -733,10 +734,10 @@ public class AMQPSessionCallback implements SessionCallback { class AddressQueryCache<T> { - String address; + SimpleString address; T result; - public synchronized T getResult(String parameterAddress) { + public synchronized T getResult(SimpleString parameterAddress) { if (address != null && address.equals(parameterAddress)) { return result; } else { @@ -746,7 +747,7 @@ public class AMQPSessionCallback implements SessionCallback { } } - public synchronized void setResult(String parameterAddress, T result) { + public synchronized void setResult(SimpleString parameterAddress, T result) { this.address = parameterAddress; this.result = result; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java index 724474b..e67fc67 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.protocol.amqp.converter; import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; @@ -38,7 +39,7 @@ public class AMQPConverter implements MessageConverter<AMQPMessage> { } @Override - public ICoreMessage toCore(AMQPMessage messageSource) throws Exception { - return AmqpCoreConverter.toCore(messageSource); + public ICoreMessage toCore(AMQPMessage messageSource, CoreMessageObjectPools coreMessageObjectPools) throws Exception { + return AmqpCoreConverter.toCore(messageSource, coreMessageObjectPools); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java index da2f4e0..1bac1e5 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Set; import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage; @@ -242,56 +243,56 @@ public final class AMQPMessageSupport { return null; } - public static ServerJMSBytesMessage createBytesMessage(long id) { - return new ServerJMSBytesMessage(newMessage(id, BYTES_TYPE)); + public static ServerJMSBytesMessage createBytesMessage(long id, CoreMessageObjectPools coreMessageObjectPools) { + return new ServerJMSBytesMessage(newMessage(id, BYTES_TYPE, coreMessageObjectPools)); } - public static ServerJMSBytesMessage createBytesMessage(long id, byte[] array, int arrayOffset, int length) throws JMSException { - ServerJMSBytesMessage message = createBytesMessage(id); + public static ServerJMSBytesMessage createBytesMessage(long id, byte[] array, int arrayOffset, int length, CoreMessageObjectPools coreMessageObjectPools) throws JMSException { + ServerJMSBytesMessage message = createBytesMessage(id, coreMessageObjectPools); message.writeBytes(array, arrayOffset, length); return message; } - public static ServerJMSStreamMessage createStreamMessage(long id) { - return new ServerJMSStreamMessage(newMessage(id, STREAM_TYPE)); + public static ServerJMSStreamMessage createStreamMessage(long id, CoreMessageObjectPools coreMessageObjectPools) { + return new ServerJMSStreamMessage(newMessage(id, STREAM_TYPE, coreMessageObjectPools)); } - public static ServerJMSMessage createMessage(long id) { - return new ServerJMSMessage(newMessage(id, DEFAULT_TYPE)); + public static ServerJMSMessage createMessage(long id, CoreMessageObjectPools coreMessageObjectPools) { + return new ServerJMSMessage(newMessage(id, DEFAULT_TYPE, coreMessageObjectPools)); } - public static ServerJMSTextMessage createTextMessage(long id) { - return new ServerJMSTextMessage(newMessage(id, TEXT_TYPE)); + public static ServerJMSTextMessage createTextMessage(long id, CoreMessageObjectPools coreMessageObjectPools) { + return new ServerJMSTextMessage(newMessage(id, TEXT_TYPE, coreMessageObjectPools)); } - public static ServerJMSTextMessage createTextMessage(long id, String text) throws JMSException { - ServerJMSTextMessage message = createTextMessage(id); + public static ServerJMSTextMessage createTextMessage(long id, String text, CoreMessageObjectPools coreMessageObjectPools) throws JMSException { + ServerJMSTextMessage message = createTextMessage(id, coreMessageObjectPools); message.setText(text); return message; } - public static ServerJMSObjectMessage createObjectMessage(long id) { - return new ServerJMSObjectMessage(newMessage(id, OBJECT_TYPE)); + public static ServerJMSObjectMessage createObjectMessage(long id, CoreMessageObjectPools coreMessageObjectPools) { + return new ServerJMSObjectMessage(newMessage(id, OBJECT_TYPE, coreMessageObjectPools)); } - public static ServerJMSMessage createObjectMessage(long id, Binary serializedForm) throws JMSException { - ServerJMSObjectMessage message = createObjectMessage(id); + public static ServerJMSMessage createObjectMessage(long id, Binary serializedForm, CoreMessageObjectPools coreMessageObjectPools) throws JMSException { + ServerJMSObjectMessage message = createObjectMessage(id, coreMessageObjectPools); message.setSerializedForm(serializedForm); return message; } - public static ServerJMSMessage createObjectMessage(long id, byte[] array, int offset, int length) throws JMSException { - ServerJMSObjectMessage message = createObjectMessage(id); + public static ServerJMSMessage createObjectMessage(long id, byte[] array, int offset, int length, CoreMessageObjectPools coreMessageObjectPools) throws JMSException { + ServerJMSObjectMessage message = createObjectMessage(id, coreMessageObjectPools); message.setSerializedForm(new Binary(array, offset, length)); return message; } - public static ServerJMSMapMessage createMapMessage(long id) { - return new ServerJMSMapMessage(newMessage(id, MAP_TYPE)); + public static ServerJMSMapMessage createMapMessage(long id, CoreMessageObjectPools coreMessageObjectPools) { + return new ServerJMSMapMessage(newMessage(id, MAP_TYPE, coreMessageObjectPools)); } - public static ServerJMSMapMessage createMapMessage(long id, Map<String, Object> content) throws JMSException { - ServerJMSMapMessage message = createMapMessage(id); + public static ServerJMSMapMessage createMapMessage(long id, Map<String, Object> content, CoreMessageObjectPools coreMessageObjectPools) throws JMSException { + ServerJMSMapMessage message = createMapMessage(id, coreMessageObjectPools); final Set<Map.Entry<String, Object>> set = content.entrySet(); for (Map.Entry<String, Object> entry : set) { Object value = entry.getValue(); @@ -304,8 +305,8 @@ public final class AMQPMessageSupport { return message; } - private static CoreMessage newMessage(long id, byte messageType) { - CoreMessage message = new CoreMessage(id, 512); + private static CoreMessage newMessage(long id, byte messageType, CoreMessageObjectPools coreMessageObjectPools) { + CoreMessage message = new CoreMessage(id, 512, coreMessageObjectPools); message.setType(messageType); // ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null); return message; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java index fbaf0ef..80969f6 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java @@ -52,6 +52,7 @@ import javax.jms.JMSException; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; @@ -89,31 +90,31 @@ import io.netty.buffer.PooledByteBufAllocator; public class AmqpCoreConverter { @SuppressWarnings("unchecked") - public static ICoreMessage toCore(AMQPMessage message) throws Exception { + public static ICoreMessage toCore(AMQPMessage message, CoreMessageObjectPools coreMessageObjectPools) throws Exception { Section body = message.getProtonMessage().getBody(); ServerJMSMessage result; if (body == null) { if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) { - result = createObjectMessage(message.getMessageID()); + result = createObjectMessage(message.getMessageID(), coreMessageObjectPools); } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage()) || isContentType(null, message.getProtonMessage())) { - result = createBytesMessage(message.getMessageID()); + result = createBytesMessage(message.getMessageID(), coreMessageObjectPools); } else { Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType()); if (charset != null) { - result = createTextMessage(message.getMessageID()); + result = createTextMessage(message.getMessageID(), coreMessageObjectPools); } else { - result = createMessage(message.getMessageID()); + result = createMessage(message.getMessageID(), coreMessageObjectPools); } } } else if (body instanceof Data) { Binary payload = ((Data) body).getValue(); if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) { - result = createObjectMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength()); + result = createObjectMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools); } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage())) { - result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength()); + result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools); } else { Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType()); if (StandardCharsets.UTF_8.equals(charset)) { @@ -121,18 +122,18 @@ public class AmqpCoreConverter { try { CharBuffer chars = charset.newDecoder().decode(buf); - result = createTextMessage(message.getMessageID(), String.valueOf(chars)); + result = createTextMessage(message.getMessageID(), String.valueOf(chars), coreMessageObjectPools); } catch (CharacterCodingException e) { - result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength()); + result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools); } } else { - result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength()); + result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools); } } } else if (body instanceof AmqpSequence) { AmqpSequence sequence = (AmqpSequence) body; - ServerJMSStreamMessage m = createStreamMessage(message.getMessageID()); + ServerJMSStreamMessage m = createStreamMessage(message.getMessageID(), coreMessageObjectPools); for (Object item : sequence.getValue()) { m.writeObject(item); } @@ -141,31 +142,31 @@ public class AmqpCoreConverter { } else if (body instanceof AmqpValue) { Object value = ((AmqpValue) body).getValue(); if (value == null || value instanceof String) { - result = createTextMessage(message.getMessageID(), (String) value); + result = createTextMessage(message.getMessageID(), (String) value, coreMessageObjectPools); } else if (value instanceof Binary) { Binary payload = (Binary) value; if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) { - result = createObjectMessage(message.getMessageID(), payload); + result = createObjectMessage(message.getMessageID(), payload, coreMessageObjectPools); } else { - result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength()); + result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools); } } else if (value instanceof List) { - ServerJMSStreamMessage m = createStreamMessage(message.getMessageID()); + ServerJMSStreamMessage m = createStreamMessage(message.getMessageID(), coreMessageObjectPools); for (Object item : (List<Object>) value) { m.writeObject(item); } result = m; } else if (value instanceof Map) { - result = createMapMessage(message.getMessageID(), (Map<String, Object>) value); + result = createMapMessage(message.getMessageID(), (Map<String, Object>) value, coreMessageObjectPools); } else { ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024); try { TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buf)); TLSEncode.getEncoder().writeObject(body); - result = createBytesMessage(message.getMessageID(), buf.array(), 0, buf.writerIndex()); + result = createBytesMessage(message.getMessageID(), buf.array(), 0, buf.writerIndex(), coreMessageObjectPools); } finally { buf.release(); TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null); @@ -186,7 +187,7 @@ public class AmqpCoreConverter { result.getInnerMessage().setReplyTo(message.getReplyTo()); result.getInnerMessage().setDurable(message.isDurable()); result.getInnerMessage().setPriority(message.getPriority()); - result.getInnerMessage().setAddress(message.getAddress()); + result.getInnerMessage().setAddress(message.getAddressSimpleString()); result.encode(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index 3e1c0fe..3c35d76 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -54,7 +54,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements protected final Receiver receiver; - protected String address; + protected SimpleString address; protected final AMQPSessionCallback sessionSPI; @@ -102,7 +102,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements if (target.getDynamic()) { // if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and // will be deleted on closing of the session - address = sessionSPI.tempQueueName(); + address = SimpleString.toSimpleString(sessionSPI.tempQueueName()); defRoutingType = getRoutingType(target.getCapabilities(), address); try { @@ -113,12 +113,12 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e); } expiryPolicy = target.getExpiryPolicy() != null ? target.getExpiryPolicy() : TerminusExpiryPolicy.LINK_DETACH; - target.setAddress(address); + target.setAddress(address.toString()); } else { // the target will have an address unless the remote is requesting an anonymous // relay in which case the address in the incoming message's to field will be // matched on receive of the message. - address = target.getAddress(); + address = SimpleString.toSimpleString(target.getAddress()); if (address != null && !address.isEmpty()) { defRoutingType = getRoutingType(target.getCapabilities(), address); @@ -134,7 +134,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements } try { - sessionSPI.check(SimpleString.toSimpleString(address), CheckType.SEND, new SecurityAuth() { + sessionSPI.check(address, CheckType.SEND, new SecurityAuth() { @Override public String getUsername() { String username = null; @@ -181,12 +181,12 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements flow(amqpCredits, minCreditRefresh); } - public RoutingType getRoutingType(Receiver receiver, String address) { + public RoutingType getRoutingType(Receiver receiver, SimpleString address) { org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget(); return target != null ? getRoutingType(target.getCapabilities(), address) : getRoutingType((Symbol[]) null, address); } - private RoutingType getRoutingType(Symbol[] symbols, String address) { + private RoutingType getRoutingType(Symbol[] symbols, SimpleString address) { if (symbols != null) { for (Symbol symbol : symbols) { if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) { @@ -264,7 +264,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget(); if (target != null && target.getDynamic() && (target.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || target.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) { try { - sessionSPI.removeTemporaryQueue(target.getAddress()); + sessionSPI.removeTemporaryQueue(SimpleString.toSimpleString(target.getAddress())); } catch (Exception e) { //ignore on close, its temp anyway and will be removed later } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index fbaae8a..1823168 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -102,7 +102,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr private boolean shared = false; private boolean global = false; private boolean isVolatile = false; - private String tempQueueName; + private SimpleString tempQueueName; public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, @@ -157,7 +157,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr super.initialise(); Source source = (Source) sender.getRemoteSource(); - String queue = null; + SimpleString queue = null; String selector = null; final Map<Symbol, Object> supportedFilters = new HashMap<>(); @@ -199,7 +199,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // the lifetime policy and capabilities of the new subscription. if (result.isExists()) { source = new org.apache.qpid.proton.amqp.messaging.Source(); - source.setAddress(queue); + source.setAddress(queue.toString()); source.setDurable(TerminusDurability.UNSETTLED_STATE); source.setExpiryPolicy(TerminusExpiryPolicy.NEVER); source.setDistributionMode(COPY); @@ -240,7 +240,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } else if (source.getDynamic()) { // if dynamic we have to create the node (queue) and set the address on the target, the // node is temporary and will be deleted on closing of the session - queue = java.util.UUID.randomUUID().toString(); + queue = SimpleString.toSimpleString(java.util.UUID.randomUUID().toString()); tempQueueName = queue; try { sessionSPI.createTemporaryQueue(queue, RoutingType.ANYCAST); @@ -248,7 +248,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage()); } - source.setAddress(queue); + source.setAddress(queue.toString()); } else { SimpleString addressToUse; SimpleString queueNameToUse = null; @@ -269,7 +269,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr multicast = hasCapabilities(TOPIC, source); AddressQueryResult addressQueryResult = null; try { - addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, true); + addressQueryResult = sessionSPI.addressQuery(addressToUse, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, true); } catch (ActiveMQSecurityException e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage()); } catch (ActiveMQAMQPException e) { @@ -294,7 +294,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // if not we look up the address AddressQueryResult addressQueryResult = null; try { - addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), defaultRoutingType, true); + addressQueryResult = sessionSPI.addressQuery(addressToUse, defaultRoutingType, true); } catch (ActiveMQSecurityException e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage()); } catch (ActiveMQAMQPException e) { @@ -333,6 +333,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } queue = getMatchingQueue(queueNameToUse, addressToUse, RoutingType.MULTICAST); + SimpleString simpleStringSelector = SimpleString.toSimpleString(selector); //if the address specifies a broker configured queue then we always use this, treat it as a queue if (queue != null) { @@ -345,24 +346,23 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr String pubId = sender.getName(); queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, false); QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false); - if (result.isExists()) { // If a client reattaches to a durable subscription with a different no-local // filter value, selector or address then we must recreate the queue (JMS semantics). - if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString(selector)) || (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) { + if (!Objects.equals(result.getFilterString(), simpleStringSelector) || (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) { if (result.getConsumerCount() == 0) { sessionSPI.deleteQueue(queue); - sessionSPI.createUnsharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector); + sessionSPI.createUnsharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector); } else { throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist"); } } } else { if (shared) { - sessionSPI.createSharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector); + sessionSPI.createSharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector); } else { - sessionSPI.createUnsharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector); + sessionSPI.createUnsharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector); } } } else { @@ -371,15 +371,15 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr if (shared && sender.getName() != null) { queue = createQueueName(connection.isUseCoreSubscriptionNaming(), getClientId(), sender.getName(), shared, global, isVolatile); try { - sessionSPI.createSharedVolatileQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector); + sessionSPI.createSharedVolatileQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector); } catch (ActiveMQQueueExistsException e) { //this is ok, just means its shared } } else { - queue = java.util.UUID.randomUUID().toString(); + queue = SimpleString.toSimpleString(java.util.UUID.randomUUID().toString()); tempQueueName = queue; try { - sessionSPI.createTemporaryQueue(source.getAddress(), queue, RoutingType.MULTICAST, selector); + sessionSPI.createTemporaryQueue(addressToUse, queue, RoutingType.MULTICAST, simpleStringSelector); } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage()); } @@ -387,18 +387,18 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } else { if (queueNameToUse != null) { - SimpleString matchingAnycastQueue = SimpleString.toSimpleString(getMatchingQueue(queueNameToUse, addressToUse, RoutingType.ANYCAST)); + SimpleString matchingAnycastQueue = getMatchingQueue(queueNameToUse, addressToUse, RoutingType.ANYCAST); if (matchingAnycastQueue != null) { - queue = matchingAnycastQueue.toString(); + queue = matchingAnycastQueue; } else { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist(); } } else { SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, RoutingType.ANYCAST); if (matchingAnycastQueue != null) { - queue = matchingAnycastQueue.toString(); + queue = matchingAnycastQueue; } else { - queue = addressToUse.toString(); + queue = addressToUse; } } @@ -437,16 +437,16 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } - private String getMatchingQueue(SimpleString queueName, SimpleString address, RoutingType routingType) throws Exception { + private SimpleString getMatchingQueue(SimpleString queueName, SimpleString address, RoutingType routingType) throws Exception { if (queueName != null) { - QueueQueryResult result = sessionSPI.queueQuery(queueName.toString(), routingType, false); + QueueQueryResult result = sessionSPI.queueQuery(queueName, routingType, false); if (!result.isExists()) { throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist"); } else { if (!result.getAddress().equals(address)) { throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist for address '" + address + "'"); } - return sessionSPI.getMatchingQueue(address, queueName, routingType).toString(); + return sessionSPI.getMatchingQueue(address, queueName, routingType); } } return null; @@ -495,7 +495,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr if (remoteLinkClose) { Source source = (Source) sender.getSource(); if (source != null && source.getAddress() != null && multicast) { - String queueName = source.getAddress(); + SimpleString queueName = SimpleString.toSimpleString(source.getAddress()); QueueQueryResult result = sessionSPI.queueQuery(queueName, routingTypeToUse, false); if (result.isExists() && source.getDynamic()) { sessionSPI.deleteQueue(queueName); @@ -508,7 +508,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr if (pubId.contains("|")) { pubId = pubId.split("\\|")[0]; } - String queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, isVolatile); + SimpleString queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, isVolatile); result = sessionSPI.queueQuery(queue, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false); //only delete if it isn't volatile and has no consumers if (result.isExists() && !isVolatile && result.getConsumerCount() == 0) { @@ -518,7 +518,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } else if (source != null && source.getDynamic() && (source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) { try { - sessionSPI.removeTemporaryQueue(source.getAddress()); + sessionSPI.removeTemporaryQueue(SimpleString.toSimpleString(source.getAddress())); } catch (Exception e) { //ignore on close, its temp anyway and will be removed later } @@ -760,7 +760,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr return false; } - private static String createQueueName(boolean useCoreSubscriptionNaming, + private static SimpleString createQueueName(boolean useCoreSubscriptionNaming, String clientId, String pubId, boolean shared, @@ -784,7 +784,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr queue += ":global"; } } - return queue; + return SimpleString.toSimpleString(queue); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java index d06464f..ccafd37 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java @@ -620,7 +620,7 @@ public class JMSMappingOutboundTransformerTest { } private ServerJMSObjectMessage createObjectMessage(Serializable payload, boolean compression) { - ServerJMSObjectMessage result = AMQPMessageSupport.createObjectMessage(0); + ServerJMSObjectMessage result = AMQPMessageSupport.createObjectMessage(0, null); if (compression) { // TODO @@ -647,7 +647,7 @@ public class JMSMappingOutboundTransformerTest { } private ServerJMSTextMessage createTextMessage(String text, boolean compression) { - ServerJMSTextMessage result = AMQPMessageSupport.createTextMessage(0); + ServerJMSTextMessage result = AMQPMessageSupport.createTextMessage(0, null); if (compression) { // TODO http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java index 73dbeaa..da10f47 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java @@ -22,6 +22,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import org.apache.activemq.artemis.core.config.WildcardConfiguration; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; @@ -56,11 +57,12 @@ public class MQTTSession { private MQTTProtocolManager protocolManager; - private boolean isClean; private WildcardConfiguration wildcardConfiguration; + private CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools(); + public MQTTSession(MQTTProtocolHandler protocolHandler, MQTTConnection connection, MQTTProtocolManager protocolManager, @@ -195,4 +197,8 @@ public class MQTTSession { public void setWildcardConfiguration(WildcardConfiguration wildcardConfiguration) { this.wildcardConfiguration = wildcardConfiguration; } + + public CoreMessageObjectPools getCoreMessageObjectPools() { + return coreMessageObjectPools; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java index 21b1f2b..39e2ba9 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java @@ -78,7 +78,7 @@ public class MQTTSessionCallback implements SessionCallback { } @Override - public void disconnect(ServerConsumer consumer, String queueName) { + public void disconnect(ServerConsumer consumer, SimpleString queueName) { try { consumer.removeItself(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java index 2cb1f7e..2667f81 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java @@ -64,13 +64,13 @@ public class MQTTUtil { public static final String MQTT_RETAIN_ADDRESS_PREFIX = "$sys.mqtt.retain."; - public static final String MQTT_QOS_LEVEL_KEY = "mqtt.qos.level"; + public static final SimpleString MQTT_QOS_LEVEL_KEY = SimpleString.toSimpleString("mqtt.qos.level"); - public static final String MQTT_MESSAGE_ID_KEY = "mqtt.message.id"; + public static final SimpleString MQTT_MESSAGE_ID_KEY = SimpleString.toSimpleString("mqtt.message.id"); - public static final String MQTT_MESSAGE_TYPE_KEY = "mqtt.message.type"; + public static final SimpleString MQTT_MESSAGE_TYPE_KEY = SimpleString.toSimpleString("mqtt.message.type"); - public static final SimpleString MQTT_MESSAGE_RETAIN_KEY = new SimpleString("mqtt.message.retain"); + public static final SimpleString MQTT_MESSAGE_RETAIN_KEY = SimpleString.toSimpleString("mqtt.message.retain"); public static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2."; @@ -113,10 +113,10 @@ public class MQTTUtil { int qos) { long id = session.getServer().getStorageManager().generateID(); - CoreMessage message = new CoreMessage(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE); + CoreMessage message = new CoreMessage(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE, session.getCoreMessageObjectPools()); message.setAddress(address); message.putBooleanProperty(MQTT_MESSAGE_RETAIN_KEY, retain); - message.putIntProperty(new SimpleString(MQTT_QOS_LEVEL_KEY), qos); + message.putIntProperty(MQTT_QOS_LEVEL_KEY, qos); message.setType(Message.BYTES_TYPE); return message; } @@ -127,7 +127,8 @@ public class MQTTUtil { int qos, ByteBuf payload) { String coreAddress = convertMQTTAddressFilterToCore(topic, session.getWildcardConfiguration()); - ICoreMessage message = createServerMessage(session, new SimpleString(coreAddress), retain, qos); + SimpleString address = SimpleString.toSimpleString(coreAddress, session.getCoreMessageObjectPools().getAddressStringSimpleStringPool()); + ICoreMessage message = createServerMessage(session, address, retain, qos); message.getBodyBuffer().writeBytes(payload, 0, payload.readableBytes()); return message; @@ -135,8 +136,8 @@ public class MQTTUtil { public static Message createPubRelMessage(MQTTSession session, SimpleString address, int messageId) { Message message = createServerMessage(session, address, false, 1); - message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_ID_KEY), messageId); - message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_TYPE_KEY), MqttMessageType.PUBREL.value()); + message.putIntProperty(MQTTUtil.MQTT_MESSAGE_ID_KEY, messageId); + message.putIntProperty(MQTTUtil.MQTT_MESSAGE_TYPE_KEY, MqttMessageType.PUBREL.value()); return message; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 9923953..86a95db 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -1121,7 +1121,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se @Override public Response processRemoveSubscription(RemoveSubscriptionInfo subInfo) throws Exception { - SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName())); + SimpleString subQueueName = org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName()); server.destroyQueue(subQueueName); return null; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 6af9997..83ff6d6 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -38,6 +38,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; import org.apache.activemq.artemis.core.server.MessageReference; @@ -108,10 +109,11 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag } @Override - public ICoreMessage toCore(OpenwireMessage pureMessage) throws Exception { + public ICoreMessage toCore(OpenwireMessage pureMessage, CoreMessageObjectPools coreMessageObjectPools) throws Exception { return null; } + // @Override public Object outbound(org.apache.activemq.artemis.api.core.Message message, int deliveryCount) { // TODO: implement this @@ -119,10 +121,10 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag } // @Override - public org.apache.activemq.artemis.api.core.Message inbound(Object message) throws Exception { + public org.apache.activemq.artemis.api.core.Message inbound(Object message, CoreMessageObjectPools coreMessageObjectPools) throws Exception { Message messageSend = (Message) message; - CoreMessage coreMessage = new CoreMessage(-1, messageSend.getSize()); + CoreMessage coreMessage = new CoreMessage(-1, messageSend.getSize(), coreMessageObjectPools); String type = messageSend.getType(); if (type != null) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java index d28eda4..c63fe19 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java @@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RefCountMessageListener; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.persistence.Persister; // TODO: Implement this @@ -442,6 +443,11 @@ public class OpenwireMessage implements Message { } @Override + public Message putStringProperty(SimpleString key, String value) { + return null; + } + + @Override public int getEncodeSize() { return 0; } @@ -478,6 +484,11 @@ public class OpenwireMessage implements Message { @Override public ICoreMessage toCore() { + return toCore(null); + } + + @Override + public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) { return null; }