ARTEMIS-1586 Refactor to make more generic

* Move byte util code into ByteUtil
* Re-use the new equals method in SimpleString
* Apply same pools/interners to client decode
* Create String to SimpleString pools/interners for property access via String 
keys (producer and consumer benefits)
* Lazy init the pools on withing the get methods of CoreMessageObjectPools to 
get the specific pool, to avoid having this scattered every where.
* reduce SimpleString creation in conversion to/from core message methods with 
JMS wrapper.
* reduce SimpleString creation in conversion to/from Core in OpenWire, AMQP, 
MQTT.

Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/98028cde
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/98028cde
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/98028cde

Branch: refs/heads/master
Commit: 98028cdecc6bd31c86a7d6decfed1961e46be7b2
Parents: 8d776ed
Author: Michael André Pearce <[email protected]>
Authored: Wed Jan 10 08:48:14 2018 +0000
Committer: Michael Pearce <[email protected]>
Committed: Wed Jan 17 09:33:41 2018 +0100

----------------------------------------------------------------------
 .../activemq/artemis/api/core/SimpleString.java | 237 ++++++++-----------
 .../artemis/utils/AbstractByteBufPool.java      | 164 +++++++++++++
 .../artemis/utils/AbstractInterner.java         | 157 ------------
 .../activemq/artemis/utils/AbstractPool.java    |  89 +++++++
 .../apache/activemq/artemis/utils/ByteUtil.java | 122 ++++++++++
 .../utils/collections/TypedProperties.java      | 122 +++++++---
 .../activemq/artemis/api/core/Message.java      |   6 +
 .../core/client/impl/ClientMessageImpl.java     |  23 +-
 .../core/client/impl/ClientSessionImpl.java     |   5 +-
 .../artemis/core/message/impl/CoreMessage.java  | 107 +++++----
 .../message/impl/CoreMessageObjectPools.java    |  55 +++++
 .../core/protocol/ClientPacketDecoder.java      |  11 +-
 .../impl/ActiveMQClientProtocolManager.java     |   2 +-
 .../activemq/artemis/reader/MessageUtil.java    |  10 +-
 .../artemis/message/CoreMessageTest.java        |   2 +-
 .../artemis/jms/client/ActiveMQDestination.java |  20 +-
 .../artemis/jms/client/ActiveMQMessage.java     |  55 ++---
 .../artemis/jms/client/ActiveMQQueue.java       |   8 +-
 .../artemis/jms/client/ActiveMQSession.java     |   6 +-
 .../jms/client/ActiveMQStreamMessage.java       |   2 +-
 .../artemis/jms/client/ActiveMQTopic.java       |   8 +-
 .../protocol/amqp/broker/AMQPMessage.java       |  74 ++++--
 .../amqp/broker/AMQPSessionCallback.java        | 105 ++++----
 .../protocol/amqp/converter/AMQPConverter.java  |   5 +-
 .../amqp/converter/AMQPMessageSupport.java      |  49 ++--
 .../amqp/converter/AmqpCoreConverter.java       |  37 +--
 .../proton/ProtonServerReceiverContext.java     |  16 +-
 .../amqp/proton/ProtonServerSenderContext.java  |  54 ++---
 .../JMSMappingOutboundTransformerTest.java      |   4 +-
 .../artemis/core/protocol/mqtt/MQTTSession.java |   8 +-
 .../core/protocol/mqtt/MQTTSessionCallback.java |   2 +-
 .../artemis/core/protocol/mqtt/MQTTUtil.java    |  19 +-
 .../protocol/openwire/OpenWireConnection.java   |   2 +-
 .../openwire/OpenWireMessageConverter.java      |   8 +-
 .../core/protocol/openwire/OpenwireMessage.java |  11 +
 .../core/protocol/openwire/amq/AMQConsumer.java |   2 +-
 .../core/protocol/openwire/amq/AMQSession.java  |  11 +-
 .../core/protocol/stomp/StompSession.java       |   2 +-
 .../ra/inflow/ActiveMQMessageHandler.java       |   2 +-
 .../core/protocol/ServerPacketDecoder.java      |  25 +-
 .../protocol/core/impl/CoreSessionCallback.java |  11 +-
 .../core/server/impl/ServerConsumerImpl.java    |   2 +-
 .../spi/core/protocol/MessageConverter.java     |   3 +-
 .../spi/core/protocol/SessionCallback.java      |   2 +-
 .../impl/ScheduledDeliveryHandlerTest.java      |  11 +
 .../integration/client/AcknowledgeTest.java     |  11 +
 .../integration/client/HangConsumerTest.java    |   2 +-
 .../persistence/XmlImportExportTest.java        |   2 +-
 48 files changed, 1048 insertions(+), 643 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
----------------------------------------------------------------------
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
index e24e245..dbf7468 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
@@ -21,8 +21,9 @@ import java.util.ArrayList;
 import java.util.List;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.util.internal.PlatformDependent;
-import org.apache.activemq.artemis.utils.AbstractInterner;
+import org.apache.activemq.artemis.utils.AbstractByteBufPool;
+import org.apache.activemq.artemis.utils.AbstractPool;
+import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.activemq.artemis.utils.DataConstants;
 
 /**
@@ -33,129 +34,6 @@ import org.apache.activemq.artemis.utils.DataConstants;
  */
 public final class SimpleString implements CharSequence, Serializable, 
Comparable<SimpleString> {
 
-   public static final class Interner extends AbstractInterner<SimpleString> {
-
-      private final int maxLength;
-
-      public Interner(final int capacity, final int maxCharsLength) {
-         super(capacity);
-         this.maxLength = maxCharsLength;
-      }
-
-      @Override
-      protected boolean isEqual(final SimpleString entry, final ByteBuf 
byteBuf, final int offset, final int length) {
-         return SimpleString.isEqual(entry, byteBuf, offset, length);
-      }
-
-      @Override
-      protected boolean canIntern(final ByteBuf byteBuf, final int length) {
-         assert length % 2 == 0 : "length must be a multiple of 2";
-         final int expectedStringLength = length >> 1;
-         return expectedStringLength <= maxLength;
-      }
-
-      @Override
-      protected SimpleString create(final ByteBuf byteBuf, final int length) {
-         return readSimpleString(byteBuf, length);
-      }
-   }
-
-   /**
-    * Returns {@code true} if  the {@link SimpleString} encoded content into 
{@code bytes} is equals to {@code s},
-    * {@code false} otherwise.
-    * <p>
-    * It assumes that the {@code bytes} content is read using {@link 
SimpleString#readSimpleString(ByteBuf, int)} ie starting right after the
-    * length field.
-    */
-   public static boolean isEqual(final SimpleString s, final ByteBuf bytes, 
final int offset, final int length) {
-      if (s == null) {
-         return false;
-      }
-      final byte[] chars = s.getData();
-      if (chars.length != length)
-         return false;
-      if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) {
-         if ((offset + length) > bytes.writerIndex()) {
-            throw new IndexOutOfBoundsException();
-         }
-         if (bytes.hasArray()) {
-            return batchOnHeapIsEqual(chars, bytes.array(), 
bytes.arrayOffset() + offset, length);
-         } else if (bytes.hasMemoryAddress()) {
-            return batchOffHeapIsEqual(chars, bytes.memoryAddress(), offset, 
length);
-         }
-      }
-      return byteBufIsEqual(chars, bytes, offset, length);
-   }
-
-   private static boolean byteBufIsEqual(final byte[] chars, final ByteBuf 
bytes, final int offset, final int length) {
-      for (int i = 0; i < length; i++)
-         if (chars[i] != bytes.getByte(offset + i))
-            return false;
-      return true;
-   }
-
-   private static boolean batchOnHeapIsEqual(final byte[] chars,
-                                             final byte[] array,
-                                             final int arrayOffset,
-                                             final int length) {
-      final int longCount = length >>> 3;
-      final int bytesCount = length & 7;
-      int bytesIndex = arrayOffset;
-      int charsIndex = 0;
-      for (int i = 0; i < longCount; i++) {
-         final long charsLong = PlatformDependent.getLong(chars, charsIndex);
-         final long bytesLong = PlatformDependent.getLong(array, bytesIndex);
-         if (charsLong != bytesLong) {
-            return false;
-
-         }
-         bytesIndex += 8;
-         charsIndex += 8;
-      }
-      for (int i = 0; i < bytesCount; i++) {
-         final byte charsByte = PlatformDependent.getByte(chars, charsIndex);
-         final byte bytesByte = PlatformDependent.getByte(array, bytesIndex);
-         if (charsByte != bytesByte) {
-            return false;
-
-         }
-         bytesIndex++;
-         charsIndex++;
-      }
-      return true;
-   }
-
-   private static boolean batchOffHeapIsEqual(final byte[] chars,
-                                              final long address,
-                                              final int offset,
-                                              final int length) {
-      final int longCount = length >>> 3;
-      final int bytesCount = length & 7;
-      long bytesAddress = address + offset;
-      int charsIndex = 0;
-      for (int i = 0; i < longCount; i++) {
-         final long charsLong = PlatformDependent.getLong(chars, charsIndex);
-         final long bytesLong = PlatformDependent.getLong(bytesAddress);
-         if (charsLong != bytesLong) {
-            return false;
-
-         }
-         bytesAddress += 8;
-         charsIndex += 8;
-      }
-      for (int i = 0; i < bytesCount; i++) {
-         final byte charsByte = PlatformDependent.getByte(chars, charsIndex);
-         final byte bytesByte = PlatformDependent.getByte(bytesAddress);
-         if (charsByte != bytesByte) {
-            return false;
-
-         }
-         bytesAddress++;
-         charsIndex++;
-      }
-      return true;
-   }
-
    private static final long serialVersionUID = 4204223851422244307L;
 
    // Attributes
@@ -185,6 +63,13 @@ public final class SimpleString implements CharSequence, 
Serializable, Comparabl
       return new SimpleString(string);
    }
 
+   public static SimpleString toSimpleString(final String string, 
StringSimpleStringPool pool) {
+      if (pool == null) {
+         return toSimpleString(string);
+      }
+      return pool.getOrCreate(string);
+   }
+
    // Constructors
    // ----------------------------------------------------------------------
 
@@ -236,6 +121,10 @@ public final class SimpleString implements CharSequence, 
Serializable, Comparabl
       data[1] = high;
    }
 
+   public boolean isEmpty() {
+      return data.length == 0;
+   }
+
    // CharSequence implementation
    // 
---------------------------------------------------------------------------
 
@@ -267,11 +156,26 @@ public final class SimpleString implements CharSequence, 
Serializable, Comparabl
       return readSimpleString(buffer);
    }
 
+   public static SimpleString readNullableSimpleString(ByteBuf buffer, 
ByteBufSimpleStringPool pool) {
+      int b = buffer.readByte();
+      if (b == DataConstants.NULL) {
+         return null;
+      }
+      return readSimpleString(buffer, pool);
+   }
+
    public static SimpleString readSimpleString(ByteBuf buffer) {
       int len = buffer.readInt();
       return readSimpleString(buffer, len);
    }
 
+   public static SimpleString readSimpleString(ByteBuf buffer, 
ByteBufSimpleStringPool pool) {
+      if (pool == null) {
+         return readSimpleString(buffer);
+      }
+      return pool.getOrCreate(buffer);
+   }
+
    public static SimpleString readSimpleString(final ByteBuf buffer, final int 
length) {
       byte[] data = new byte[length];
       buffer.readBytes(data);
@@ -381,22 +285,23 @@ public final class SimpleString implements CharSequence, 
Serializable, Comparabl
       if (other instanceof SimpleString) {
          SimpleString s = (SimpleString) other;
 
-         if (data.length != s.data.length) {
-            return false;
-         }
-
-         for (int i = 0; i < data.length; i++) {
-            if (data[i] != s.data[i]) {
-               return false;
-            }
-         }
-
-         return true;
+         return ByteUtil.equals(data, s.data);
       } else {
          return false;
       }
    }
 
+   /**
+    * Returns {@code true} if  the {@link SimpleString} encoded content into 
{@code bytes} is equals to {@code s},
+    * {@code false} otherwise.
+    * <p>
+    * It assumes that the {@code bytes} content is read using {@link 
SimpleString#readSimpleString(ByteBuf, int)} ie starting right after the
+    * length field.
+    */
+   public boolean equals(final ByteBuf byteBuf, final int offset, final int 
length) {
+      return ByteUtil.equals(data, byteBuf, offset, length);
+   }
+
    @Override
    public int hashCode() {
       if (hash == 0) {
@@ -575,4 +480,64 @@ public final class SimpleString implements CharSequence, 
Serializable, Comparabl
          dst[d++] = (char) (low | high);
       }
    }
+
+   public static final class ByteBufSimpleStringPool extends 
AbstractByteBufPool<SimpleString> {
+
+      private static final int UUID_LENGTH = 36;
+
+      private final int maxLength;
+
+      public ByteBufSimpleStringPool() {
+         this.maxLength = UUID_LENGTH;
+      }
+
+      public ByteBufSimpleStringPool(final int capacity, final int 
maxCharsLength) {
+         super(capacity);
+         this.maxLength = maxCharsLength;
+      }
+
+      @Override
+      protected boolean isEqual(final SimpleString entry, final ByteBuf 
byteBuf, final int offset, final int length) {
+         if (entry == null) {
+            return false;
+         }
+         return entry.equals(byteBuf, offset, length);
+      }
+
+      @Override
+      protected boolean canPool(final ByteBuf byteBuf, final int length) {
+         assert length % 2 == 0 : "length must be a multiple of 2";
+         final int expectedStringLength = length >> 1;
+         return expectedStringLength <= maxLength;
+      }
+
+      @Override
+      protected SimpleString create(final ByteBuf byteBuf, final int length) {
+         return readSimpleString(byteBuf, length);
+      }
+   }
+
+   public static final class StringSimpleStringPool extends 
AbstractPool<String, SimpleString> {
+
+      public StringSimpleStringPool() {
+         super();
+      }
+
+      public StringSimpleStringPool(final int capacity) {
+         super(capacity);
+      }
+
+      @Override
+      protected SimpleString create(String value) {
+         return toSimpleString(value);
+      }
+
+      @Override
+      protected boolean isEqual(SimpleString entry, String value) {
+         if (entry == null) {
+            return false;
+         }
+         return entry.toString().equals(value);
+      }
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractByteBufPool.java
----------------------------------------------------------------------
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractByteBufPool.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractByteBufPool.java
new file mode 100644
index 0000000..87c1b6f
--- /dev/null
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractByteBufPool.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.internal.MathUtil;
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Thread-safe {@code <T>} interner.
+ * <p>
+ * Differently from {@link String#intern()} it contains a fixed amount of 
entries and
+ * when used by concurrent threads it doesn't ensure the uniqueness of the 
entries ie
+ * the same entry could be allocated multiple times by concurrent calls.
+ */
+public abstract class AbstractByteBufPool<T> {
+
+   public static final int DEFAULT_POOL_CAPACITY = 32;
+
+   private final T[] entries;
+   private final int mask;
+   private final int shift;
+
+   public AbstractByteBufPool() {
+      this(DEFAULT_POOL_CAPACITY);
+   }
+
+   public AbstractByteBufPool(final int capacity) {
+      entries = (T[]) new 
Object[MathUtil.findNextPositivePowerOfTwo(capacity)];
+      mask = entries.length - 1;
+      //log2 of entries.length
+      shift = 31 - Integer.numberOfLeadingZeros(entries.length);
+   }
+
+   /**
+    * Batch hash code implementation that works at its best if {@code bytes}
+    * contains a {@link org.apache.activemq.artemis.api.core.SimpleString} 
encoded.
+    */
+   private static int hashCode(final ByteBuf bytes, final int offset, final 
int length) {
+      if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) {
+         //if the platform allows it, the hash code could be computed without 
bounds checking
+         if (bytes.hasArray()) {
+            return onHeapHashCode(bytes.array(), bytes.arrayOffset() + offset, 
length);
+         } else if (bytes.hasMemoryAddress()) {
+            return offHeapHashCode(bytes.memoryAddress(), offset, length);
+         }
+      }
+      return byteBufHashCode(bytes, offset, length);
+   }
+
+   private static int onHeapHashCode(final byte[] bytes, final int offset, 
final int length) {
+      final int intCount = length >>> 1;
+      final int byteCount = length & 1;
+      int hashCode = 1;
+      int arrayIndex = offset;
+      for (int i = 0; i < intCount; i++) {
+         hashCode = 31 * hashCode + PlatformDependent.getShort(bytes, 
arrayIndex);
+         arrayIndex += 2;
+      }
+      for (int i = 0; i < byteCount; i++) {
+         hashCode = 31 * hashCode + PlatformDependent.getByte(bytes, 
arrayIndex++);
+      }
+      return hashCode;
+   }
+
+   private static int offHeapHashCode(final long address, final int offset, 
final int length) {
+      final int intCount = length >>> 1;
+      final int byteCount = length & 1;
+      int hashCode = 1;
+      int arrayIndex = offset;
+      for (int i = 0; i < intCount; i++) {
+         hashCode = 31 * hashCode + PlatformDependent.getShort(address + 
arrayIndex);
+         arrayIndex += 2;
+      }
+      for (int i = 0; i < byteCount; i++) {
+         hashCode = 31 * hashCode + PlatformDependent.getByte(address + 
arrayIndex++);
+      }
+      return hashCode;
+   }
+
+   private static int byteBufHashCode(final ByteBuf byteBuf, final int offset, 
final int length) {
+      final int intCount = length >>> 1;
+      final int byteCount = length & 1;
+      int hashCode = 1;
+      int arrayIndex = offset;
+      for (int i = 0; i < intCount; i++) {
+         final short shortLE = byteBuf.getShortLE(arrayIndex);
+         final short nativeShort = PlatformDependent.BIG_ENDIAN_NATIVE_ORDER ? 
Short.reverseBytes(shortLE) : shortLE;
+         hashCode = 31 * hashCode + nativeShort;
+         arrayIndex += 2;
+      }
+      for (int i = 0; i < byteCount; i++) {
+         hashCode = 31 * hashCode + byteBuf.getByte(arrayIndex++);
+      }
+      return hashCode;
+   }
+
+   /**
+    * Returns {@code true} if {@code length}'s {@code byteBuf} content from 
{@link ByteBuf#readerIndex()} can be pooled,
+    * {@code false} otherwise.
+    */
+   protected abstract boolean canPool(ByteBuf byteBuf, int length);
+
+   /**
+    * Create a new entry.
+    */
+   protected abstract T create(ByteBuf byteBuf, int length);
+
+   /**
+    * Returns {@code true} if the {@code entry} content is the same of {@code 
byteBuf} at the specified {@code offset}
+    * and {@code length} {@code false} otherwise.
+    */
+   protected abstract boolean isEqual(T entry, ByteBuf byteBuf, int offset, 
int length);
+
+   /**
+    * Returns a pooled entry if possible, a new one otherwise.
+    * <p>
+    * The {@code byteBuf}'s {@link ByteBuf#readerIndex()} is incremented by 
{@code length} after it.
+    */
+   public final T getOrCreate(final ByteBuf byteBuf) {
+      final int length = byteBuf.readInt();
+      if (!canPool(byteBuf, length)) {
+         return create(byteBuf, length);
+      } else {
+         if (!byteBuf.isReadable(length)) {
+            throw new IndexOutOfBoundsException();
+         }
+         final int bytesOffset = byteBuf.readerIndex();
+         final int hashCode = hashCode(byteBuf, bytesOffset, length);
+         //fast % operation with power of 2 entries.length
+         final int firstIndex = hashCode & mask;
+         final T firstEntry = entries[firstIndex];
+         if (isEqual(firstEntry, byteBuf, bytesOffset, length)) {
+            byteBuf.skipBytes(length);
+            return firstEntry;
+         }
+         final int secondIndex = (hashCode >> shift) & mask;
+         final T secondEntry = entries[secondIndex];
+         if (isEqual(secondEntry, byteBuf, bytesOffset, length)) {
+            byteBuf.skipBytes(length);
+            return secondEntry;
+         }
+         final T internedEntry = create(byteBuf, length);
+         final int entryIndex = firstEntry == null ? firstIndex : secondIndex;
+         entries[entryIndex] = internedEntry;
+         return internedEntry;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java
----------------------------------------------------------------------
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java
deleted file mode 100644
index 7e1fe40..0000000
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.artemis.utils;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.util.internal.MathUtil;
-import io.netty.util.internal.PlatformDependent;
-
-/**
- * Thread-safe {@code <T>} interner.
- * <p>
- * Differently from {@link String#intern()} it contains a fixed amount of 
entries and
- * when used by concurrent threads it doesn't ensure the uniqueness of the 
entries ie
- * the same entry could be allocated multiple times by concurrent calls.
- */
-public abstract class AbstractInterner<T> {
-
-   private final T[] entries;
-   private final int mask;
-   private final int shift;
-
-   public AbstractInterner(final int capacity) {
-      entries = (T[]) new 
Object[MathUtil.findNextPositivePowerOfTwo(capacity)];
-      mask = entries.length - 1;
-      //log2 of entries.length
-      shift = 31 - Integer.numberOfLeadingZeros(entries.length);
-   }
-
-   /**
-    * Batch hash code implementation that works at its best if {@code bytes}
-    * contains a {@link org.apache.activemq.artemis.api.core.SimpleString} 
encoded.
-    */
-   private static int hashCode(final ByteBuf bytes, final int offset, final 
int length) {
-      if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) {
-         //if the platform allows it, the hash code could be computed without 
bounds checking
-         if (bytes.hasArray()) {
-            return onHeapHashCode(bytes.array(), bytes.arrayOffset() + offset, 
length);
-         } else if (bytes.hasMemoryAddress()) {
-            return offHeapHashCode(bytes.memoryAddress(), offset, length);
-         }
-      }
-      return byteBufHashCode(bytes, offset, length);
-   }
-
-   private static int onHeapHashCode(final byte[] bytes, final int offset, 
final int length) {
-      final int intCount = length >>> 1;
-      final int byteCount = length & 1;
-      int hashCode = 1;
-      int arrayIndex = offset;
-      for (int i = 0; i < intCount; i++) {
-         hashCode = 31 * hashCode + PlatformDependent.getShort(bytes, 
arrayIndex);
-         arrayIndex += 2;
-      }
-      for (int i = 0; i < byteCount; i++) {
-         hashCode = 31 * hashCode + PlatformDependent.getByte(bytes, 
arrayIndex++);
-      }
-      return hashCode;
-   }
-
-   private static int offHeapHashCode(final long address, final int offset, 
final int length) {
-      final int intCount = length >>> 1;
-      final int byteCount = length & 1;
-      int hashCode = 1;
-      int arrayIndex = offset;
-      for (int i = 0; i < intCount; i++) {
-         hashCode = 31 * hashCode + PlatformDependent.getShort(address + 
arrayIndex);
-         arrayIndex += 2;
-      }
-      for (int i = 0; i < byteCount; i++) {
-         hashCode = 31 * hashCode + PlatformDependent.getByte(address + 
arrayIndex++);
-      }
-      return hashCode;
-   }
-
-   private static int byteBufHashCode(final ByteBuf byteBuf, final int offset, 
final int length) {
-      final int intCount = length >>> 1;
-      final int byteCount = length & 1;
-      int hashCode = 1;
-      int arrayIndex = offset;
-      for (int i = 0; i < intCount; i++) {
-         final short shortLE = byteBuf.getShortLE(arrayIndex);
-         final short nativeShort = PlatformDependent.BIG_ENDIAN_NATIVE_ORDER ? 
Short.reverseBytes(shortLE) : shortLE;
-         hashCode = 31 * hashCode + nativeShort;
-         arrayIndex += 2;
-      }
-      for (int i = 0; i < byteCount; i++) {
-         hashCode = 31 * hashCode + byteBuf.getByte(arrayIndex++);
-      }
-      return hashCode;
-   }
-
-   /**
-    * Returns {@code true} if {@code length}'s {@code byteBuf} content from 
{@link ByteBuf#readerIndex()} can be interned,
-    * {@code false} otherwise.
-    */
-   protected abstract boolean canIntern(ByteBuf byteBuf, int length);
-
-   /**
-    * Create a new entry.
-    */
-   protected abstract T create(ByteBuf byteBuf, int length);
-
-   /**
-    * Returns {@code true} if the {@code entry} content is the same of {@code 
byteBuf} at the specified {@code offset}
-    * and {@code length} {@code false} otherwise.
-    */
-   protected abstract boolean isEqual(T entry, ByteBuf byteBuf, int offset, 
int length);
-
-   /**
-    * Returns and interned entry if possible, a new one otherwise.
-    * <p>
-    * The {@code byteBuf}'s {@link ByteBuf#readerIndex()} is incremented by 
{@code length} after it.
-    */
-   public final T intern(final ByteBuf byteBuf, final int length) {
-      if (!canIntern(byteBuf, length)) {
-         return create(byteBuf, length);
-      } else {
-         if (!byteBuf.isReadable(length)) {
-            throw new IndexOutOfBoundsException();
-         }
-         final int bytesOffset = byteBuf.readerIndex();
-         final int hashCode = hashCode(byteBuf, bytesOffset, length);
-         //fast % operation with power of 2 entries.length
-         final int firstIndex = hashCode & mask;
-         final T firstEntry = entries[firstIndex];
-         if (isEqual(firstEntry, byteBuf, bytesOffset, length)) {
-            byteBuf.skipBytes(length);
-            return firstEntry;
-         }
-         final int secondIndex = (hashCode >> shift) & mask;
-         final T secondEntry = entries[secondIndex];
-         if (isEqual(secondEntry, byteBuf, bytesOffset, length)) {
-            byteBuf.skipBytes(length);
-            return secondEntry;
-         }
-         final T internedEntry = create(byteBuf, length);
-         final int entryIndex = firstEntry == null ? firstIndex : secondIndex;
-         entries[entryIndex] = internedEntry;
-         return internedEntry;
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractPool.java
----------------------------------------------------------------------
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractPool.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractPool.java
new file mode 100644
index 0000000..cc42e8f
--- /dev/null
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractPool.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.internal.MathUtil;
+
+/**
+ * Thread-safe {@code <T>} interner.
+ * <p>
+ * Differently from {@link String#intern()} it contains a fixed amount of 
entries and
+ * when used by concurrent threads it doesn't ensure the uniqueness of the 
entries ie
+ * the same entry could be allocated multiple times by concurrent calls.
+ */
+public abstract class AbstractPool<I, O> {
+
+   public static final int DEFAULT_POOL_CAPACITY = 32;
+
+   private final O[] entries;
+   private final int mask;
+   private final int shift;
+
+   public AbstractPool() {
+      this(DEFAULT_POOL_CAPACITY);
+   }
+
+   public AbstractPool(final int capacity) {
+      entries = (O[]) new 
Object[MathUtil.findNextPositivePowerOfTwo(capacity)];
+      mask = entries.length - 1;
+      //log2 of entries.length
+      shift = 31 - Integer.numberOfLeadingZeros(entries.length);
+   }
+
+   /**
+    * Create a new entry.
+    */
+   protected abstract O create(I value);
+
+   /**
+    * Returns {@code true} if the {@code entry} content is equal to {@code 
value};
+    */
+   protected abstract boolean isEqual(O entry, I value);
+
+   protected int hashCode(I value) {
+      return value.hashCode();
+   }
+
+   /**
+    * Returns and interned entry if possible, a new one otherwise.
+    * <p>
+    * The {@code byteBuf}'s {@link ByteBuf#readerIndex()} is incremented by 
{@code length} after it.
+    */
+   public final O getOrCreate(final I value) {
+      if (value == null) {
+         return null;
+      }
+      final int hashCode = hashCode(value);
+      //fast % operation with power of 2 entries.length
+      final int firstIndex = hashCode & mask;
+      final O firstEntry = entries[firstIndex];
+      if (isEqual(firstEntry, value)) {
+         return firstEntry;
+      }
+      final int secondIndex = (hashCode >> shift) & mask;
+      final O secondEntry = entries[secondIndex];
+      if (isEqual(secondEntry, value)) {
+         return secondEntry;
+      }
+      final O internedEntry = create(value);
+      final int entryIndex = firstEntry == null ? firstIndex : secondIndex;
+      entries[entryIndex] = internedEntry;
+      return internedEntry;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
----------------------------------------------------------------------
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
index e70891d..8835797 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
@@ -22,6 +22,7 @@ import java.util.regex.Pattern;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.util.internal.PlatformDependent;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.logs.ActiveMQUtilBundle;
@@ -207,4 +208,125 @@ public class ByteUtil {
          throw ActiveMQUtilBundle.BUNDLE.failedToParseLong(text);
       }
    }
+
+   public static boolean equals(final byte[] left, final byte[] right) {
+      return equals(left, right, 0, right.length);
+   }
+
+   public static boolean equals(final byte[] left,
+                                final byte[] right,
+                                final int rightOffset,
+                                final int rightLength) {
+      if (left == right)
+         return true;
+      if (left == null || right == null)
+         return false;
+      if (left.length != rightLength)
+         return false;
+      if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) {
+         return equalsUnsafe(left, right, rightOffset, rightLength);
+      } else {
+         return equalsSafe(left, right, rightOffset, rightLength);
+      }
+   }
+
+   private static boolean equalsSafe(byte[] left, byte[] right, int 
rightOffset, int rightLength) {
+      for (int i = 0; i < rightLength; i++)
+         if (left[i] != right[rightOffset + i])
+            return false;
+      return true;
+   }
+
+   private static boolean equalsUnsafe(final byte[] left,
+                                       final byte[] right,
+                                       final int rightOffset,
+                                       final int rightLength) {
+      final int longCount = rightLength >>> 3;
+      final int bytesCount = rightLength & 7;
+      int bytesIndex = rightOffset;
+      int charsIndex = 0;
+      for (int i = 0; i < longCount; i++) {
+         final long charsLong = PlatformDependent.getLong(left, charsIndex);
+         final long bytesLong = PlatformDependent.getLong(right, bytesIndex);
+         if (charsLong != bytesLong) {
+            return false;
+         }
+         bytesIndex += 8;
+         charsIndex += 8;
+      }
+      for (int i = 0; i < bytesCount; i++) {
+         final byte charsByte = PlatformDependent.getByte(left, charsIndex);
+         final byte bytesByte = PlatformDependent.getByte(right, bytesIndex);
+         if (charsByte != bytesByte) {
+            return false;
+         }
+         bytesIndex++;
+         charsIndex++;
+      }
+      return true;
+   }
+
+
+   /**
+    * Returns {@code true} if  the {@link SimpleString} encoded content into 
{@code bytes} is equals to {@code s},
+    * {@code false} otherwise.
+    * <p>
+    * It assumes that the {@code bytes} content is read using {@link 
SimpleString#readSimpleString(ByteBuf, int)} ie starting right after the
+    * length field.
+    */
+   public static boolean equals(final byte[] bytes, final ByteBuf byteBuf, 
final int offset, final int length) {
+      if (bytes.length != length)
+         return false;
+      if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) {
+         if ((offset + length) > byteBuf.writerIndex()) {
+            throw new IndexOutOfBoundsException();
+         }
+         if (byteBuf.hasArray()) {
+            return equals(bytes, byteBuf.array(), byteBuf.arrayOffset() + 
offset, length);
+         } else if (byteBuf.hasMemoryAddress()) {
+            return equalsOffHeap(bytes, byteBuf.memoryAddress(), offset, 
length);
+         }
+      }
+      return equalsOnHeap(bytes, byteBuf, offset, length);
+   }
+
+   private static boolean equalsOnHeap(final byte[] bytes, final ByteBuf 
byteBuf, final int offset, final int length) {
+      if (bytes.length != length)
+         return false;
+      for (int i = 0; i < length; i++)
+         if (bytes[i] != byteBuf.getByte(offset + i))
+            return false;
+      return true;
+   }
+
+   private static boolean equalsOffHeap(final byte[] bytes,
+                                        final long address,
+                                        final int offset,
+                                        final int length) {
+      final int longCount = length >>> 3;
+      final int bytesCount = length & 7;
+      long bytesAddress = address + offset;
+      int charsIndex = 0;
+      for (int i = 0; i < longCount; i++) {
+         final long charsLong = PlatformDependent.getLong(bytes, charsIndex);
+         final long bytesLong = PlatformDependent.getLong(bytesAddress);
+         if (charsLong != bytesLong) {
+            return false;
+
+         }
+         bytesAddress += 8;
+         charsIndex += 8;
+      }
+      for (int i = 0; i < bytesCount; i++) {
+         final byte charsByte = PlatformDependent.getByte(bytes, charsIndex);
+         final byte bytesByte = PlatformDependent.getByte(bytesAddress);
+         if (charsByte != bytesByte) {
+            return false;
+
+         }
+         bytesAddress++;
+         charsIndex++;
+      }
+      return true;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
----------------------------------------------------------------------
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
index a3e4876..56beb76 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
@@ -28,7 +28,7 @@ import io.netty.buffer.ByteBuf;
 import 
org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.logs.ActiveMQUtilBundle;
-import org.apache.activemq.artemis.utils.AbstractInterner;
+import org.apache.activemq.artemis.utils.AbstractByteBufPool;
 import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.activemq.artemis.utils.DataConstants;
 
@@ -332,8 +332,7 @@ public class TypedProperties {
    }
 
    public synchronized void decode(final ByteBuf buffer,
-                                   final SimpleString.Interner keyInterner,
-                                   final StringValue.Interner valueInterner) {
+                                   final TypedPropertiesDecoderPools 
keyValuePools) {
       byte b = buffer.readByte();
 
       if (b == DataConstants.NULL) {
@@ -346,15 +345,7 @@ public class TypedProperties {
          size = 0;
 
          for (int i = 0; i < numHeaders; i++) {
-            final SimpleString key;
-            int len = buffer.readInt();
-            if (keyInterner != null) {
-               key = keyInterner.intern(buffer, len);
-            } else {
-               byte[] data = new byte[len];
-               buffer.readBytes(data);
-               key = new SimpleString(data);
-            }
+            final SimpleString key = SimpleString.readSimpleString(buffer, 
keyValuePools == null ? null : keyValuePools.getPropertyKeysPool());
 
             byte type = buffer.readByte();
 
@@ -412,12 +403,7 @@ public class TypedProperties {
                   break;
                }
                case STRING: {
-                  if (valueInterner != null) {
-                     final int length = buffer.readInt();
-                     val = valueInterner.intern(buffer, length);
-                  } else {
-                     val = new StringValue(buffer);
-                  }
+                  val = StringValue.readStringValue(buffer, keyValuePools == 
null ? null : keyValuePools.getPropertyValuesPool());
                   doPutValue(key, val);
                   break;
                }
@@ -430,7 +416,7 @@ public class TypedProperties {
    }
 
    public synchronized void decode(final ByteBuf buffer) {
-      decode(buffer, null, null);
+      decode(buffer, null);
    }
 
    public synchronized void encode(final ByteBuf buffer) {
@@ -901,25 +887,61 @@ public class TypedProperties {
 
    public static final class StringValue extends PropertyValue {
 
-      public static final class Interner extends AbstractInterner<StringValue> 
{
+      final SimpleString val;
+
+      private StringValue(final SimpleString val) {
+         this.val = val;
+      }
+
+      static StringValue readStringValue(final ByteBuf byteBuf, 
ByteBufStringValuePool pool) {
+         if (pool == null) {
+            return new StringValue(SimpleString.readSimpleString(byteBuf));
+         } else {
+            return pool.getOrCreate(byteBuf);
+         }
+      }
+
+      @Override
+      public Object getValue() {
+         return val;
+      }
+
+      @Override
+      public void write(final ByteBuf buffer) {
+         buffer.writeByte(DataConstants.STRING);
+         SimpleString.writeSimpleString(buffer, val);
+      }
+
+      @Override
+      public int encodeSize() {
+         return DataConstants.SIZE_BYTE + SimpleString.sizeofString(val);
+      }
+
+      public static final class ByteBufStringValuePool extends 
AbstractByteBufPool<StringValue> {
+
+         private static final int UUID_LENGTH = 36;
 
          private final int maxLength;
 
-         public Interner(final int capacity, final int maxCharsLength) {
+         public ByteBufStringValuePool() {
+            this.maxLength = UUID_LENGTH;
+         }
+
+         public ByteBufStringValuePool(final int capacity, final int 
maxCharsLength) {
             super(capacity);
             this.maxLength = maxCharsLength;
          }
 
          @Override
          protected boolean isEqual(final StringValue entry, final ByteBuf 
byteBuf, final int offset, final int length) {
-            if (entry == null) {
+            if (entry == null || entry.val == null) {
                return false;
             }
-            return SimpleString.isEqual(entry.val, byteBuf, offset, length);
+            return entry.val.equals(byteBuf, offset, length);
          }
 
          @Override
-         protected boolean canIntern(final ByteBuf byteBuf, final int length) {
+         protected boolean canPool(final ByteBuf byteBuf, final int length) {
             assert length % 2 == 0 : "length must be a multiple of 2";
             final int expectedStringLength = length >> 1;
             return expectedStringLength <= maxLength;
@@ -930,31 +952,53 @@ public class TypedProperties {
             return new StringValue(SimpleString.readSimpleString(byteBuf, 
length));
          }
       }
+   }
 
-      final SimpleString val;
+   public static class TypedPropertiesDecoderPools {
 
-      private StringValue(final SimpleString val) {
-         this.val = val;
+      private SimpleString.ByteBufSimpleStringPool propertyKeysPool;
+      private TypedProperties.StringValue.ByteBufStringValuePool 
propertyValuesPool;
+
+      public TypedPropertiesDecoderPools() {
+         this.propertyKeysPool = new SimpleString.ByteBufSimpleStringPool();
+         this.propertyValuesPool = new 
TypedProperties.StringValue.ByteBufStringValuePool();
       }
 
-      private StringValue(final ByteBuf buffer) {
-         val = SimpleString.readSimpleString(buffer);
+      public TypedPropertiesDecoderPools(int keyPoolCapacity, int 
valuePoolCapacity, int maxCharsLength) {
+         this.propertyKeysPool = new 
SimpleString.ByteBufSimpleStringPool(keyPoolCapacity, maxCharsLength);
+         this.propertyValuesPool = new 
TypedProperties.StringValue.ByteBufStringValuePool(valuePoolCapacity, 
maxCharsLength);
       }
 
-      @Override
-      public Object getValue() {
-         return val;
+      public SimpleString.ByteBufSimpleStringPool getPropertyKeysPool() {
+         return propertyKeysPool;
       }
 
-      @Override
-      public void write(final ByteBuf buffer) {
-         buffer.writeByte(DataConstants.STRING);
-         SimpleString.writeSimpleString(buffer, val);
+      public TypedProperties.StringValue.ByteBufStringValuePool 
getPropertyValuesPool() {
+         return propertyValuesPool;
       }
+   }
 
-      @Override
-      public int encodeSize() {
-         return DataConstants.SIZE_BYTE + SimpleString.sizeofString(val);
+   public static class TypedPropertiesStringSimpleStringPools {
+
+      private SimpleString.StringSimpleStringPool propertyKeysPool;
+      private SimpleString.StringSimpleStringPool propertyValuesPool;
+
+      public TypedPropertiesStringSimpleStringPools() {
+         this.propertyKeysPool = new SimpleString.StringSimpleStringPool();
+         this.propertyValuesPool = new SimpleString.StringSimpleStringPool();
+      }
+
+      public TypedPropertiesStringSimpleStringPools(int keyPoolCapacity, int 
valuePoolCapacity) {
+         this.propertyKeysPool = new 
SimpleString.StringSimpleStringPool(keyPoolCapacity);
+         this.propertyValuesPool = new 
SimpleString.StringSimpleStringPool(valuePoolCapacity);
+      }
+
+      public SimpleString.StringSimpleStringPool getPropertyKeysPool() {
+         return propertyKeysPool;
+      }
+
+      public SimpleString.StringSimpleStringPool getPropertyValuesPool() {
+         return propertyValuesPool;
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index ddb8a3b..d24cd95 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.Set;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.persistence.Persister;
 
 /**
@@ -587,6 +588,8 @@ public interface Message {
 
    Message putStringProperty(SimpleString key, SimpleString value);
 
+   Message putStringProperty(SimpleString key, String value);
+
    /**
     * Returns the size of the <em>encoded</em> message.
     */
@@ -649,6 +652,9 @@ public interface Message {
    /** This should make you convert your message into Core format. */
    ICoreMessage toCore();
 
+   /** This should make you convert your message into Core format. */
+   ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools);
+
    int getMemoryEstimate();
 
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
index 91fb6ca..8068aa9 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
@@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
 import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.activemq.artemis.utils.UUID;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
@@ -59,6 +60,10 @@ public class ClientMessageImpl extends CoreMessage 
implements ClientMessageInter
    public ClientMessageImpl() {
    }
 
+   public ClientMessageImpl(CoreMessageObjectPools coreMessageObjectPools) {
+      super(coreMessageObjectPools);
+   }
+
    protected ClientMessageImpl(ClientMessageImpl other) {
       super(other);
    }
@@ -96,11 +101,22 @@ public class ClientMessageImpl extends CoreMessage 
implements ClientMessageInter
                             final long expiration,
                             final long timestamp,
                             final byte priority,
-                            final int initialMessageBufferSize) {
+                            final int initialMessageBufferSize,
+                            final CoreMessageObjectPools 
coreMessageObjectPools) {
+      super(coreMessageObjectPools);
       
this.setType(type).setExpiration(expiration).setTimestamp(timestamp).setDurable(durable).
            setPriority(priority).initBuffer(initialMessageBufferSize);
    }
 
+   public ClientMessageImpl(final byte type,
+                            final boolean durable,
+                            final long expiration,
+                            final long timestamp,
+                            final byte priority,
+                            final int initialMessageBufferSize) {
+      this(type, durable, expiration, timestamp, priority, 
initialMessageBufferSize, null);
+   }
+
    @Override
    public TypedProperties getProperties() {
       return this.checkProperties();
@@ -286,6 +302,11 @@ public class ClientMessageImpl extends CoreMessage 
implements ClientMessageInter
    }
 
    @Override
+   public ClientMessageImpl putStringProperty(final SimpleString key, final 
String value) {
+      return (ClientMessageImpl) super.putStringProperty(key, value);
+   }
+
+   @Override
    public ClientMessageImpl putObjectProperty(final SimpleString key,
                                               final Object value) throws 
ActiveMQPropertyConversionException {
       return (ClientMessageImpl) super.putObjectProperty(key, value);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index 61784ad..b5f8a1b 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -43,6 +43,7 @@ import 
org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
 import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
 import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
 import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.remoting.FailureListener;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -148,6 +149,8 @@ public final class ClientSessionImpl implements 
ClientSessionInternal, FailureLi
 
    private final Executor closeExecutor;
 
+   private final CoreMessageObjectPools coreMessageObjectPools = new 
CoreMessageObjectPools();
+
    ClientSessionImpl(final ClientSessionFactoryInternal sessionFactory,
                      final String name,
                      final String username,
@@ -869,7 +872,7 @@ public final class ClientSessionImpl implements 
ClientSessionInternal, FailureLi
                                       final long expiration,
                                       final long timestamp,
                                       final byte priority) {
-      return new ClientMessageImpl(type, durable, expiration, timestamp, 
priority, initialMessagePacketSize);
+      return new ClientMessageImpl(type, durable, expiration, timestamp, 
priority, initialMessagePacketSize, coreMessageObjectPools);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 4ebf97e..888b785 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -93,18 +93,14 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
 
    protected volatile TypedProperties properties;
 
-   private final SimpleString.Interner keysInterner;
-   private final TypedProperties.StringValue.Interner valuesInterner;
+   private final CoreMessageObjectPools coreMessageObjectPools;
 
-   public CoreMessage(final SimpleString.Interner keysInterner,
-                      final TypedProperties.StringValue.Interner 
valuesInterner) {
-      this.keysInterner = keysInterner;
-      this.valuesInterner = valuesInterner;
+   public CoreMessage(final CoreMessageObjectPools coreMessageObjectPools) {
+      this.coreMessageObjectPools = coreMessageObjectPools;
    }
 
    public CoreMessage() {
-      this.keysInterner = null;
-      this.valuesInterner = null;
+      this.coreMessageObjectPools = null;
    }
 
    /** On core there's no delivery annotation */
@@ -326,10 +322,13 @@ public class CoreMessage extends RefCountMessage 
implements ICoreMessage {
    }
 
    public CoreMessage(long id, int bufferSize) {
+      this(id, bufferSize, null);
+   }
+
+   public CoreMessage(long id, int bufferSize, CoreMessageObjectPools 
coreMessageObjectPools) {
       this.initBuffer(bufferSize);
       this.setMessageID(id);
-      this.keysInterner = null;
-      this.valuesInterner = null;
+      this.coreMessageObjectPools = coreMessageObjectPools;
    }
 
    protected CoreMessage(CoreMessage other, TypedProperties copyProperties) {
@@ -343,8 +342,7 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
       this.timestamp = other.timestamp;
       this.priority = other.priority;
       this.userID = other.userID;
-      this.keysInterner = other.keysInterner;
-      this.valuesInterner = other.valuesInterner;
+      this.coreMessageObjectPools = other.coreMessageObjectPools;
       if (copyProperties != null) {
          this.properties = new TypedProperties(copyProperties);
       }
@@ -424,7 +422,7 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
 
    @Override
    public CoreMessage setValidatedUserID(String validatedUserID) {
-      putStringProperty(Message.HDR_VALIDATED_USER, 
SimpleString.toSimpleString(validatedUserID));
+      putStringProperty(Message.HDR_VALIDATED_USER, 
SimpleString.toSimpleString(validatedUserID, getPropertyValuesPool()));
       return this;
    }
 
@@ -479,7 +477,7 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
          TypedProperties properties = new TypedProperties();
          if (buffer != null && propertiesLocation >= 0) {
             final ByteBuf byteBuf = 
buffer.duplicate().readerIndex(propertiesLocation);
-            properties.decode(byteBuf, keysInterner, valuesInterner);
+            properties.decode(byteBuf, coreMessageObjectPools == null ? null : 
coreMessageObjectPools.getPropertiesDecoderPools());
          }
          this.properties = properties;
       }
@@ -543,17 +541,8 @@ public class CoreMessage extends RefCountMessage 
implements ICoreMessage {
    private void decodeHeadersAndProperties(final ByteBuf buffer, boolean 
lazyProperties) {
       messageIDPosition = buffer.readerIndex();
       messageID = buffer.readLong();
-      int b = buffer.readByte();
-      if (b != DataConstants.NULL) {
-         final int length = buffer.readInt();
-         if (keysInterner != null) {
-            address = keysInterner.intern(buffer, length);
-         } else {
-            address = SimpleString.readSimpleString(buffer, length);
-         }
-      } else {
-         address = null;
-      }
+
+      address = SimpleString.readNullableSimpleString(buffer, 
coreMessageObjectPools == null ? null : 
coreMessageObjectPools.getAddressDecoderPool());
       if (buffer.readByte() == DataConstants.NOT_NULL) {
          byte[] bytes = new byte[16];
          buffer.readBytes(bytes);
@@ -571,7 +560,7 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
          propertiesLocation = buffer.readerIndex();
       } else {
          properties = new TypedProperties();
-         properties.decode(buffer, keysInterner, valuesInterner);
+         properties.decode(buffer, coreMessageObjectPools == null ? null : 
coreMessageObjectPools.getPropertiesDecoderPools());
       }
    }
 
@@ -671,7 +660,7 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
    @Override
    public CoreMessage setAddress(String address) {
       messageChanged();
-      this.address = SimpleString.toSimpleString(address);
+      this.address = SimpleString.toSimpleString(address, 
coreMessageObjectPools == null ? null : 
coreMessageObjectPools.getAddressStringSimpleStringPool());
       return this;
    }
 
@@ -703,7 +692,7 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
    public CoreMessage putBooleanProperty(final String key, final boolean 
value) {
       messageChanged();
       checkProperties();
-      properties.putBooleanProperty(new SimpleString(key), value);
+      properties.putBooleanProperty(SimpleString.toSimpleString(key, 
getPropertyKeysPool()), value);
       return this;
    }
 
@@ -724,7 +713,7 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
    @Override
    public Boolean getBooleanProperty(final String key) throws 
ActiveMQPropertyConversionException {
       checkProperties();
-      return properties.getBooleanProperty(new SimpleString(key));
+      return properties.getBooleanProperty(SimpleString.toSimpleString(key, 
getPropertyKeysPool()));
    }
 
    @Override
@@ -739,7 +728,7 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
    public CoreMessage putByteProperty(final String key, final byte value) {
       messageChanged();
       checkProperties();
-      properties.putByteProperty(new SimpleString(key), value);
+      properties.putByteProperty(SimpleString.toSimpleString(key, 
getPropertyKeysPool()), value);
 
       return this;
    }
@@ -752,7 +741,7 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
 
    @Override
    public Byte getByteProperty(final String key) throws 
ActiveMQPropertyConversionException {
-      return getByteProperty(SimpleString.toSimpleString(key));
+      return getByteProperty(SimpleString.toSimpleString(key, 
getPropertyKeysPool()));
    }
 
    @Override
@@ -768,7 +757,7 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
    public CoreMessage putBytesProperty(final String key, final byte[] value) {
       messageChanged();
       checkProperties();
-      properties.putBytesProperty(new SimpleString(key), value);
+      properties.putBytesProperty(SimpleString.toSimpleString(key, 
getPropertyKeysPool()), value);
       return this;
    }
 
@@ -780,7 +769,7 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
 
    @Override
    public byte[] getBytesProperty(final String key) throws 
ActiveMQPropertyConversionException {
-      return getBytesProperty(new SimpleString(key));
+      return getBytesProperty(SimpleString.toSimpleString(key, 
getPropertyKeysPool()));
    }
 
    @Override
@@ -795,7 +784,7 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
    public CoreMessage putCharProperty(String key, char value) {
       messageChanged();
       checkProperties();
-      properties.putCharProperty(new SimpleString(key), value);
+      properties.putCharProperty(SimpleString.toSimpleString(key, 
getPropertyKeysPool()), value);
       return this;
    }
 
@@ -811,7 +800,7 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
    public CoreMessage putShortProperty(final String key, final short value) {
       messageChanged();
       checkProperties();
-      properties.putShortProperty(new SimpleString(key), value);
+      properties.putShortProperty(SimpleString.toSimpleString(key, 
getPropertyKeysPool()), value);
       return this;
    }
 
@@ -827,7 +816,7 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
    public CoreMessage putIntProperty(final String key, final int value) {
       messageChanged();
       checkProperties();
-      properties.putIntProperty(new SimpleString(key), value);
+      properties.putIntProperty(SimpleString.toSimpleString(key, 
getPropertyKeysPool()), value);
       return this;
    }
 
@@ -854,7 +843,7 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
    public CoreMessage putLongProperty(final String key, final long value) {
       messageChanged();
       checkProperties();
-      properties.putLongProperty(new SimpleString(key), value);
+      properties.putLongProperty(SimpleString.toSimpleString(key, 
getPropertyKeysPool()), value);
       return this;
    }
 
@@ -882,7 +871,7 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
    public CoreMessage putFloatProperty(final String key, final float value) {
       messageChanged();
       checkProperties();
-      properties.putFloatProperty(new SimpleString(key), value);
+      properties.putFloatProperty(SimpleString.toSimpleString(key, 
getPropertyKeysPool()), value);
       return this;
    }
 
@@ -898,7 +887,7 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
    public CoreMessage putDoubleProperty(final String key, final double value) {
       messageChanged();
       checkProperties();
-      properties.putDoubleProperty(new SimpleString(key), value);
+      properties.putDoubleProperty(SimpleString.toSimpleString(key, 
getPropertyKeysPool()), value);
       return this;
    }
 
@@ -924,10 +913,19 @@ public class CoreMessage extends RefCountMessage 
implements ICoreMessage {
    }
 
    @Override
+   public CoreMessage putStringProperty(final SimpleString key, final String 
value) {
+      messageChanged();
+      checkProperties();
+      properties.putSimpleStringProperty(key, 
SimpleString.toSimpleString(value, getPropertyValuesPool()));
+      return this;
+   }
+
+
+   @Override
    public CoreMessage putStringProperty(final String key, final String value) {
       messageChanged();
       checkProperties();
-      properties.putSimpleStringProperty(new SimpleString(key), 
SimpleString.toSimpleString(value));
+      properties.putSimpleStringProperty(SimpleString.toSimpleString(key, 
getPropertyKeysPool()), SimpleString.toSimpleString(value, 
getPropertyValuesPool()));
       return this;
    }
 
@@ -943,7 +941,7 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
    @Override
    public Object getObjectProperty(final String key) {
       checkProperties();
-      return getObjectProperty(SimpleString.toSimpleString(key));
+      return getObjectProperty(SimpleString.toSimpleString(key, 
getPropertyKeysPool()));
    }
 
    @Override
@@ -955,7 +953,7 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
    @Override
    public CoreMessage putObjectProperty(final String key, final Object value) 
throws ActiveMQPropertyConversionException {
       messageChanged();
-      putObjectProperty(new SimpleString(key), value);
+      putObjectProperty(SimpleString.toSimpleString(key, 
getPropertyKeysPool()), value);
       return this;
    }
 
@@ -968,7 +966,7 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
    @Override
    public Short getShortProperty(final String key) throws 
ActiveMQPropertyConversionException {
       checkProperties();
-      return properties.getShortProperty(new SimpleString(key));
+      return properties.getShortProperty(SimpleString.toSimpleString(key, 
getPropertyKeysPool()));
    }
 
    @Override
@@ -980,7 +978,7 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
    @Override
    public Float getFloatProperty(final String key) throws 
ActiveMQPropertyConversionException {
       checkProperties();
-      return properties.getFloatProperty(new SimpleString(key));
+      return properties.getFloatProperty(SimpleString.toSimpleString(key, 
getPropertyKeysPool()));
    }
 
    @Override
@@ -996,7 +994,7 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
 
    @Override
    public String getStringProperty(final String key) throws 
ActiveMQPropertyConversionException {
-      return getStringProperty(new SimpleString(key));
+      return getStringProperty(SimpleString.toSimpleString(key, 
getPropertyKeysPool()));
    }
 
    @Override
@@ -1008,7 +1006,7 @@ public class CoreMessage extends RefCountMessage 
implements ICoreMessage {
    @Override
    public SimpleString getSimpleStringProperty(final String key) throws 
ActiveMQPropertyConversionException {
       checkProperties();
-      return properties.getSimpleStringProperty(new SimpleString(key));
+      return 
properties.getSimpleStringProperty(SimpleString.toSimpleString(key, 
getPropertyKeysPool()));
    }
 
    @Override
@@ -1025,7 +1023,7 @@ public class CoreMessage extends RefCountMessage 
implements ICoreMessage {
    public Object removeProperty(final String key) {
       messageChanged();
       checkProperties();
-      Object oldValue = properties.removeProperty(new SimpleString(key));
+      Object oldValue = 
properties.removeProperty(SimpleString.toSimpleString(key, 
getPropertyKeysPool()));
       if (oldValue != null) {
          messageChanged();
       }
@@ -1041,7 +1039,7 @@ public class CoreMessage extends RefCountMessage 
implements ICoreMessage {
    @Override
    public boolean containsProperty(final String key) {
       checkProperties();
-      return properties.containsProperty(new SimpleString(key));
+      return properties.containsProperty(SimpleString.toSimpleString(key, 
getPropertyKeysPool()));
    }
 
    @Override
@@ -1116,6 +1114,11 @@ public class CoreMessage extends RefCountMessage 
implements ICoreMessage {
    }
 
    @Override
+   public CoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
+      return this;
+   }
+
+   @Override
    public String toString() {
       try {
          checkProperties();
@@ -1135,4 +1138,12 @@ public class CoreMessage extends RefCountMessage 
implements ICoreMessage {
          return new java.util.Date(timestamp).toString();
       }
    }
+
+   private SimpleString.StringSimpleStringPool getPropertyKeysPool() {
+      return coreMessageObjectPools == null ? null : 
coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyKeysPool();
+   }
+
+   private SimpleString.StringSimpleStringPool getPropertyValuesPool() {
+      return coreMessageObjectPools == null ? null : 
coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyValuesPool();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java
new file mode 100644
index 0000000..d4e3ed1
--- /dev/null
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.message.impl;
+
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.utils.collections.TypedProperties;
+
+public class CoreMessageObjectPools {
+
+   private Supplier<SimpleString.ByteBufSimpleStringPool> addressDecoderPool = 
Suppliers.memoize(SimpleString.ByteBufSimpleStringPool::new);
+   private Supplier<TypedProperties.TypedPropertiesDecoderPools> 
propertiesDecoderPools = 
Suppliers.memoize(TypedProperties.TypedPropertiesDecoderPools::new);
+
+   private Supplier<SimpleString.StringSimpleStringPool> 
groupIdStringSimpleStringPool = 
Suppliers.memoize(SimpleString.StringSimpleStringPool::new);
+   private Supplier<SimpleString.StringSimpleStringPool> 
addressStringSimpleStringPool = 
Suppliers.memoize(SimpleString.StringSimpleStringPool::new);
+   private Supplier<TypedProperties.TypedPropertiesStringSimpleStringPools> 
propertiesStringSimpleStringPools = 
Suppliers.memoize(TypedProperties.TypedPropertiesStringSimpleStringPools::new);
+
+   public CoreMessageObjectPools() {
+   }
+
+   public SimpleString.ByteBufSimpleStringPool getAddressDecoderPool() {
+      return addressDecoderPool.get();
+   }
+
+   public SimpleString.StringSimpleStringPool 
getAddressStringSimpleStringPool() {
+      return addressStringSimpleStringPool.get();
+   }
+
+   public SimpleString.StringSimpleStringPool 
getGroupIdStringSimpleStringPool() {
+      return groupIdStringSimpleStringPool.get();
+   }
+
+   public TypedProperties.TypedPropertiesDecoderPools 
getPropertiesDecoderPools() {
+      return propertiesDecoderPools.get();
+   }
+
+   public TypedProperties.TypedPropertiesStringSimpleStringPools 
getPropertiesStringSimpleStringPools() {
+      return propertiesStringSimpleStringPools.get();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
index 787e499..ad8c7a9 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl;
 import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketDecoder;
@@ -32,11 +33,7 @@ import static 
org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES
 public class ClientPacketDecoder extends PacketDecoder {
 
    private static final long serialVersionUID = 6952614096979334582L;
-   public static final ClientPacketDecoder INSTANCE = new 
ClientPacketDecoder();
-
-   protected ClientPacketDecoder() {
-
-   }
+   protected final CoreMessageObjectPools coreMessageObjectPools = new 
CoreMessageObjectPools();
 
    @Override
    public Packet decode(final ActiveMQBuffer in, CoreRemotingConnection 
connection) {
@@ -56,9 +53,9 @@ public class ClientPacketDecoder extends PacketDecoder {
       switch (packetType) {
          case SESS_RECEIVE_MSG: {
             if (connection.isVersionBeforeAddressChange()) {
-               packet = new SessionReceiveMessage_1X(new ClientMessageImpl());
+               packet = new SessionReceiveMessage_1X(new 
ClientMessageImpl(coreMessageObjectPools));
             } else {
-               packet = new SessionReceiveMessage(new ClientMessageImpl());
+               packet = new SessionReceiveMessage(new 
ClientMessageImpl(coreMessageObjectPools));
             }
             break;
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
index f0005ff..c58a0bd 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
@@ -511,7 +511,7 @@ public class ActiveMQClientProtocolManager implements 
ClientProtocolManager {
    }
 
    protected PacketDecoder createPacketDecoder() {
-      return ClientPacketDecoder.INSTANCE;
+      return new ClientPacketDecoder();
    }
 
    private void forceReturnChannel1(ActiveMQException cause) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
index 2660f96..e8f5920 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
@@ -109,12 +109,18 @@ public class MessageUtil {
       return message.getSimpleStringProperty(REPLYTO_HEADER_NAME);
    }
 
-   public static void setJMSReplyTo(Message message, final SimpleString dest) {
-
+   public static void setJMSReplyTo(Message message, final String dest) {
       if (dest == null) {
          message.removeProperty(REPLYTO_HEADER_NAME);
       } else {
+         message.putStringProperty(REPLYTO_HEADER_NAME, dest);
+      }
+   }
 
+   public static void setJMSReplyTo(Message message, final SimpleString dest) {
+      if (dest == null) {
+         message.removeProperty(REPLYTO_HEADER_NAME);
+      } else {
          message.putStringProperty(REPLYTO_HEADER_NAME, dest);
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
 
b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
index ec94011..310b4ed 100644
--- 
a/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
+++ 
b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
@@ -337,7 +337,7 @@ public class CoreMessageTest {
 
    public String generate(String body) throws Exception {
 
-      ClientMessageImpl message = new ClientMessageImpl(MESSAGE_TYPE, DURABLE, 
EXPIRATION, TIMESTAMP, PRIORITY, 10 * 1024);
+      ClientMessageImpl message = new ClientMessageImpl(MESSAGE_TYPE, DURABLE, 
EXPIRATION, TIMESTAMP, PRIORITY, 10 * 1024, null);
       TextMessageUtil.writeBodyText(message.getBodyBuffer(), 
SimpleString.toSimpleString(body));
 
       message.setAddress(ADDRESS);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
----------------------------------------------------------------------
diff --git 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
index 626dd4d..7750564 100644
--- 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
+++ 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
@@ -99,26 +99,28 @@ public class ActiveMQDestination extends JNDIStorable 
implements Destination, Se
       }
    }
 
-   public static String createQueueNameForSubscription(final boolean isDurable,
+   public static SimpleString createQueueNameForSubscription(final boolean 
isDurable,
                                                        final String clientID,
                                                        final String 
subscriptionName) {
+      final String queueName;
       if (clientID != null) {
          if (isDurable) {
-            return ActiveMQDestination.escape(clientID) + SEPARATOR +
+            queueName = ActiveMQDestination.escape(clientID) + SEPARATOR +
                ActiveMQDestination.escape(subscriptionName);
          } else {
-            return "nonDurable" + SEPARATOR +
+            queueName = "nonDurable" + SEPARATOR +
                ActiveMQDestination.escape(clientID) + SEPARATOR +
                ActiveMQDestination.escape(subscriptionName);
          }
       } else {
          if (isDurable) {
-            return ActiveMQDestination.escape(subscriptionName);
+            queueName = ActiveMQDestination.escape(subscriptionName);
          } else {
-            return "nonDurable" + SEPARATOR +
+            queueName = "nonDurable" + SEPARATOR +
                ActiveMQDestination.escape(subscriptionName);
          }
       }
+      return SimpleString.toSimpleString(queueName);
    }
 
    public static String createQueueNameForSharedSubscription(final boolean 
isDurable,
@@ -192,10 +194,18 @@ public class ActiveMQDestination extends JNDIStorable 
implements Destination, Se
       return new ActiveMQQueue(address);
    }
 
+   public static ActiveMQQueue createQueue(final SimpleString address) {
+      return new ActiveMQQueue(address);
+   }
+
    public static ActiveMQTopic createTopic(final String address) {
       return new ActiveMQTopic(address);
    }
 
+   public static ActiveMQTopic createTopic(final SimpleString address) {
+      return new ActiveMQTopic(address);
+   }
+
    public static ActiveMQTemporaryQueue createTemporaryQueue(final String 
address, final ActiveMQSession session) {
       return new ActiveMQTemporaryQueue(address, session);
    }

Reply via email to