ARTEMIS-1586 Reduce GC pressure due to String allocations on Core protocol

The commit contains:
- a general purpose interner implementation
- StringValue/SimpleString internrs specializations
- TypedProperties keys/values string interning for SessionSendMessage decoding


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8d776edd
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8d776edd
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8d776edd

Branch: refs/heads/master
Commit: 8d776eddfcc12bfc73771c04e376583c9fa221e1
Parents: 00bd989
Author: Francesco Nigro <[email protected]>
Authored: Thu Jan 4 15:22:05 2018 +0100
Committer: Michael Pearce <[email protected]>
Committed: Wed Jan 17 09:33:41 2018 +0100

----------------------------------------------------------------------
 .../activemq/artemis/api/core/SimpleString.java | 138 +++++++++++++++-
 .../artemis/utils/AbstractInterner.java         | 157 +++++++++++++++++++
 .../utils/collections/TypedProperties.java      |  60 ++++++-
 .../artemis/core/message/impl/CoreMessage.java  |  36 ++++-
 .../core/protocol/ClientPacketDecoder.java      |   4 +
 .../impl/ActiveMQClientProtocolManager.java     |   4 +-
 .../core/protocol/ServerPacketDecoder.java      |  29 +++-
 .../protocol/core/impl/CoreProtocolManager.java |   2 +-
 ...ctiveMQServerSideProtocolManagerFactory.java |   4 +-
 9 files changed, 404 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
----------------------------------------------------------------------
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
index 79909c7..e24e245 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
@@ -21,6 +21,8 @@ import java.util.ArrayList;
 import java.util.List;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.util.internal.PlatformDependent;
+import org.apache.activemq.artemis.utils.AbstractInterner;
 import org.apache.activemq.artemis.utils.DataConstants;
 
 /**
@@ -31,6 +33,129 @@ import org.apache.activemq.artemis.utils.DataConstants;
  */
 public final class SimpleString implements CharSequence, Serializable, 
Comparable<SimpleString> {
 
+   public static final class Interner extends AbstractInterner<SimpleString> {
+
+      private final int maxLength;
+
+      public Interner(final int capacity, final int maxCharsLength) {
+         super(capacity);
+         this.maxLength = maxCharsLength;
+      }
+
+      @Override
+      protected boolean isEqual(final SimpleString entry, final ByteBuf 
byteBuf, final int offset, final int length) {
+         return SimpleString.isEqual(entry, byteBuf, offset, length);
+      }
+
+      @Override
+      protected boolean canIntern(final ByteBuf byteBuf, final int length) {
+         assert length % 2 == 0 : "length must be a multiple of 2";
+         final int expectedStringLength = length >> 1;
+         return expectedStringLength <= maxLength;
+      }
+
+      @Override
+      protected SimpleString create(final ByteBuf byteBuf, final int length) {
+         return readSimpleString(byteBuf, length);
+      }
+   }
+
+   /**
+    * Returns {@code true} if  the {@link SimpleString} encoded content into 
{@code bytes} is equals to {@code s},
+    * {@code false} otherwise.
+    * <p>
+    * It assumes that the {@code bytes} content is read using {@link 
SimpleString#readSimpleString(ByteBuf, int)} ie starting right after the
+    * length field.
+    */
+   public static boolean isEqual(final SimpleString s, final ByteBuf bytes, 
final int offset, final int length) {
+      if (s == null) {
+         return false;
+      }
+      final byte[] chars = s.getData();
+      if (chars.length != length)
+         return false;
+      if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) {
+         if ((offset + length) > bytes.writerIndex()) {
+            throw new IndexOutOfBoundsException();
+         }
+         if (bytes.hasArray()) {
+            return batchOnHeapIsEqual(chars, bytes.array(), 
bytes.arrayOffset() + offset, length);
+         } else if (bytes.hasMemoryAddress()) {
+            return batchOffHeapIsEqual(chars, bytes.memoryAddress(), offset, 
length);
+         }
+      }
+      return byteBufIsEqual(chars, bytes, offset, length);
+   }
+
+   private static boolean byteBufIsEqual(final byte[] chars, final ByteBuf 
bytes, final int offset, final int length) {
+      for (int i = 0; i < length; i++)
+         if (chars[i] != bytes.getByte(offset + i))
+            return false;
+      return true;
+   }
+
+   private static boolean batchOnHeapIsEqual(final byte[] chars,
+                                             final byte[] array,
+                                             final int arrayOffset,
+                                             final int length) {
+      final int longCount = length >>> 3;
+      final int bytesCount = length & 7;
+      int bytesIndex = arrayOffset;
+      int charsIndex = 0;
+      for (int i = 0; i < longCount; i++) {
+         final long charsLong = PlatformDependent.getLong(chars, charsIndex);
+         final long bytesLong = PlatformDependent.getLong(array, bytesIndex);
+         if (charsLong != bytesLong) {
+            return false;
+
+         }
+         bytesIndex += 8;
+         charsIndex += 8;
+      }
+      for (int i = 0; i < bytesCount; i++) {
+         final byte charsByte = PlatformDependent.getByte(chars, charsIndex);
+         final byte bytesByte = PlatformDependent.getByte(array, bytesIndex);
+         if (charsByte != bytesByte) {
+            return false;
+
+         }
+         bytesIndex++;
+         charsIndex++;
+      }
+      return true;
+   }
+
+   private static boolean batchOffHeapIsEqual(final byte[] chars,
+                                              final long address,
+                                              final int offset,
+                                              final int length) {
+      final int longCount = length >>> 3;
+      final int bytesCount = length & 7;
+      long bytesAddress = address + offset;
+      int charsIndex = 0;
+      for (int i = 0; i < longCount; i++) {
+         final long charsLong = PlatformDependent.getLong(chars, charsIndex);
+         final long bytesLong = PlatformDependent.getLong(bytesAddress);
+         if (charsLong != bytesLong) {
+            return false;
+
+         }
+         bytesAddress += 8;
+         charsIndex += 8;
+      }
+      for (int i = 0; i < bytesCount; i++) {
+         final byte charsByte = PlatformDependent.getByte(chars, charsIndex);
+         final byte bytesByte = PlatformDependent.getByte(bytesAddress);
+         if (charsByte != bytesByte) {
+            return false;
+
+         }
+         bytesAddress++;
+         charsIndex++;
+      }
+      return true;
+   }
+
    private static final long serialVersionUID = 4204223851422244307L;
 
    // Attributes
@@ -134,7 +259,6 @@ public final class SimpleString implements CharSequence, 
Serializable, Comparabl
       return subSeq(start, end);
    }
 
-
    public static SimpleString readNullableSimpleString(ByteBuf buffer) {
       int b = buffer.readByte();
       if (b == DataConstants.NULL) {
@@ -143,13 +267,13 @@ public final class SimpleString implements CharSequence, 
Serializable, Comparabl
       return readSimpleString(buffer);
    }
 
-
    public static SimpleString readSimpleString(ByteBuf buffer) {
       int len = buffer.readInt();
-      if (len > buffer.readableBytes()) {
-         throw new IndexOutOfBoundsException();
-      }
-      byte[] data = new byte[len];
+      return readSimpleString(buffer, len);
+   }
+
+   public static SimpleString readSimpleString(final ByteBuf buffer, final int 
length) {
+      byte[] data = new byte[length];
       buffer.readBytes(data);
       return new SimpleString(data);
    }
@@ -169,8 +293,6 @@ public final class SimpleString implements CharSequence, 
Serializable, Comparabl
       buffer.writeBytes(data);
    }
 
-
-
    public SimpleString subSeq(final int start, final int end) {
       int len = data.length >> 1;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java
----------------------------------------------------------------------
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java
new file mode 100644
index 0000000..7e1fe40
--- /dev/null
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.internal.MathUtil;
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Thread-safe {@code <T>} interner.
+ * <p>
+ * Differently from {@link String#intern()} it contains a fixed amount of 
entries and
+ * when used by concurrent threads it doesn't ensure the uniqueness of the 
entries ie
+ * the same entry could be allocated multiple times by concurrent calls.
+ */
+public abstract class AbstractInterner<T> {
+
+   private final T[] entries;
+   private final int mask;
+   private final int shift;
+
+   public AbstractInterner(final int capacity) {
+      entries = (T[]) new 
Object[MathUtil.findNextPositivePowerOfTwo(capacity)];
+      mask = entries.length - 1;
+      //log2 of entries.length
+      shift = 31 - Integer.numberOfLeadingZeros(entries.length);
+   }
+
+   /**
+    * Batch hash code implementation that works at its best if {@code bytes}
+    * contains a {@link org.apache.activemq.artemis.api.core.SimpleString} 
encoded.
+    */
+   private static int hashCode(final ByteBuf bytes, final int offset, final 
int length) {
+      if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) {
+         //if the platform allows it, the hash code could be computed without 
bounds checking
+         if (bytes.hasArray()) {
+            return onHeapHashCode(bytes.array(), bytes.arrayOffset() + offset, 
length);
+         } else if (bytes.hasMemoryAddress()) {
+            return offHeapHashCode(bytes.memoryAddress(), offset, length);
+         }
+      }
+      return byteBufHashCode(bytes, offset, length);
+   }
+
+   private static int onHeapHashCode(final byte[] bytes, final int offset, 
final int length) {
+      final int intCount = length >>> 1;
+      final int byteCount = length & 1;
+      int hashCode = 1;
+      int arrayIndex = offset;
+      for (int i = 0; i < intCount; i++) {
+         hashCode = 31 * hashCode + PlatformDependent.getShort(bytes, 
arrayIndex);
+         arrayIndex += 2;
+      }
+      for (int i = 0; i < byteCount; i++) {
+         hashCode = 31 * hashCode + PlatformDependent.getByte(bytes, 
arrayIndex++);
+      }
+      return hashCode;
+   }
+
+   private static int offHeapHashCode(final long address, final int offset, 
final int length) {
+      final int intCount = length >>> 1;
+      final int byteCount = length & 1;
+      int hashCode = 1;
+      int arrayIndex = offset;
+      for (int i = 0; i < intCount; i++) {
+         hashCode = 31 * hashCode + PlatformDependent.getShort(address + 
arrayIndex);
+         arrayIndex += 2;
+      }
+      for (int i = 0; i < byteCount; i++) {
+         hashCode = 31 * hashCode + PlatformDependent.getByte(address + 
arrayIndex++);
+      }
+      return hashCode;
+   }
+
+   private static int byteBufHashCode(final ByteBuf byteBuf, final int offset, 
final int length) {
+      final int intCount = length >>> 1;
+      final int byteCount = length & 1;
+      int hashCode = 1;
+      int arrayIndex = offset;
+      for (int i = 0; i < intCount; i++) {
+         final short shortLE = byteBuf.getShortLE(arrayIndex);
+         final short nativeShort = PlatformDependent.BIG_ENDIAN_NATIVE_ORDER ? 
Short.reverseBytes(shortLE) : shortLE;
+         hashCode = 31 * hashCode + nativeShort;
+         arrayIndex += 2;
+      }
+      for (int i = 0; i < byteCount; i++) {
+         hashCode = 31 * hashCode + byteBuf.getByte(arrayIndex++);
+      }
+      return hashCode;
+   }
+
+   /**
+    * Returns {@code true} if {@code length}'s {@code byteBuf} content from 
{@link ByteBuf#readerIndex()} can be interned,
+    * {@code false} otherwise.
+    */
+   protected abstract boolean canIntern(ByteBuf byteBuf, int length);
+
+   /**
+    * Create a new entry.
+    */
+   protected abstract T create(ByteBuf byteBuf, int length);
+
+   /**
+    * Returns {@code true} if the {@code entry} content is the same of {@code 
byteBuf} at the specified {@code offset}
+    * and {@code length} {@code false} otherwise.
+    */
+   protected abstract boolean isEqual(T entry, ByteBuf byteBuf, int offset, 
int length);
+
+   /**
+    * Returns and interned entry if possible, a new one otherwise.
+    * <p>
+    * The {@code byteBuf}'s {@link ByteBuf#readerIndex()} is incremented by 
{@code length} after it.
+    */
+   public final T intern(final ByteBuf byteBuf, final int length) {
+      if (!canIntern(byteBuf, length)) {
+         return create(byteBuf, length);
+      } else {
+         if (!byteBuf.isReadable(length)) {
+            throw new IndexOutOfBoundsException();
+         }
+         final int bytesOffset = byteBuf.readerIndex();
+         final int hashCode = hashCode(byteBuf, bytesOffset, length);
+         //fast % operation with power of 2 entries.length
+         final int firstIndex = hashCode & mask;
+         final T firstEntry = entries[firstIndex];
+         if (isEqual(firstEntry, byteBuf, bytesOffset, length)) {
+            byteBuf.skipBytes(length);
+            return firstEntry;
+         }
+         final int secondIndex = (hashCode >> shift) & mask;
+         final T secondEntry = entries[secondIndex];
+         if (isEqual(secondEntry, byteBuf, bytesOffset, length)) {
+            byteBuf.skipBytes(length);
+            return secondEntry;
+         }
+         final T internedEntry = create(byteBuf, length);
+         final int entryIndex = firstEntry == null ? firstIndex : secondIndex;
+         entries[entryIndex] = internedEntry;
+         return internedEntry;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
----------------------------------------------------------------------
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
index b17156e..a3e4876 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
@@ -28,6 +28,7 @@ import io.netty.buffer.ByteBuf;
 import 
org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.logs.ActiveMQUtilBundle;
+import org.apache.activemq.artemis.utils.AbstractInterner;
 import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.activemq.artemis.utils.DataConstants;
 
@@ -94,6 +95,7 @@ public class TypedProperties {
 
    public void putByteProperty(final SimpleString key, final byte value) {
       checkCreateProperties();
+      checkCreateProperties();
       doPutValue(key, ByteValue.valueOf(value));
    }
 
@@ -329,7 +331,9 @@ public class TypedProperties {
       }
    }
 
-   public synchronized void decode(final ByteBuf buffer) {
+   public synchronized void decode(final ByteBuf buffer,
+                                   final SimpleString.Interner keyInterner,
+                                   final StringValue.Interner valueInterner) {
       byte b = buffer.readByte();
 
       if (b == DataConstants.NULL) {
@@ -342,10 +346,15 @@ public class TypedProperties {
          size = 0;
 
          for (int i = 0; i < numHeaders; i++) {
+            final SimpleString key;
             int len = buffer.readInt();
-            byte[] data = new byte[len];
-            buffer.readBytes(data);
-            SimpleString key = new SimpleString(data);
+            if (keyInterner != null) {
+               key = keyInterner.intern(buffer, len);
+            } else {
+               byte[] data = new byte[len];
+               buffer.readBytes(data);
+               key = new SimpleString(data);
+            }
 
             byte type = buffer.readByte();
 
@@ -403,7 +412,12 @@ public class TypedProperties {
                   break;
                }
                case STRING: {
-                  val = new StringValue(buffer);
+                  if (valueInterner != null) {
+                     final int length = buffer.readInt();
+                     val = valueInterner.intern(buffer, length);
+                  } else {
+                     val = new StringValue(buffer);
+                  }
                   doPutValue(key, val);
                   break;
                }
@@ -415,6 +429,10 @@ public class TypedProperties {
       }
    }
 
+   public synchronized void decode(final ByteBuf buffer) {
+      decode(buffer, null, null);
+   }
+
    public synchronized void encode(final ByteBuf buffer) {
       if (properties == null) {
          buffer.writeByte(DataConstants.NULL);
@@ -881,7 +899,37 @@ public class TypedProperties {
       }
    }
 
-   private static final class StringValue extends PropertyValue {
+   public static final class StringValue extends PropertyValue {
+
+      public static final class Interner extends AbstractInterner<StringValue> 
{
+
+         private final int maxLength;
+
+         public Interner(final int capacity, final int maxCharsLength) {
+            super(capacity);
+            this.maxLength = maxCharsLength;
+         }
+
+         @Override
+         protected boolean isEqual(final StringValue entry, final ByteBuf 
byteBuf, final int offset, final int length) {
+            if (entry == null) {
+               return false;
+            }
+            return SimpleString.isEqual(entry.val, byteBuf, offset, length);
+         }
+
+         @Override
+         protected boolean canIntern(final ByteBuf byteBuf, final int length) {
+            assert length % 2 == 0 : "length must be a multiple of 2";
+            final int expectedStringLength = length >> 1;
+            return expectedStringLength <= maxLength;
+         }
+
+         @Override
+         protected StringValue create(final ByteBuf byteBuf, final int length) 
{
+            return new StringValue(SimpleString.readSimpleString(byteBuf, 
length));
+         }
+      }
 
       final SimpleString val;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index b0656b6..4ebf97e 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.LinkedList;
 import java.util.Set;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -42,8 +43,6 @@ import org.apache.activemq.artemis.utils.UUID;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
 import org.jboss.logging.Logger;
 
-import io.netty.buffer.ByteBuf;
-
 /** Note: you shouldn't change properties using multi-threads. Change your 
properties before you can send it to multiple
  *  consumers */
 public class CoreMessage extends RefCountMessage implements ICoreMessage {
@@ -94,7 +93,18 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
 
    protected volatile TypedProperties properties;
 
+   private final SimpleString.Interner keysInterner;
+   private final TypedProperties.StringValue.Interner valuesInterner;
+
+   public CoreMessage(final SimpleString.Interner keysInterner,
+                      final TypedProperties.StringValue.Interner 
valuesInterner) {
+      this.keysInterner = keysInterner;
+      this.valuesInterner = valuesInterner;
+   }
+
    public CoreMessage() {
+      this.keysInterner = null;
+      this.valuesInterner = null;
    }
 
    /** On core there's no delivery annotation */
@@ -318,6 +328,8 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
    public CoreMessage(long id, int bufferSize) {
       this.initBuffer(bufferSize);
       this.setMessageID(id);
+      this.keysInterner = null;
+      this.valuesInterner = null;
    }
 
    protected CoreMessage(CoreMessage other, TypedProperties copyProperties) {
@@ -331,6 +343,8 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
       this.timestamp = other.timestamp;
       this.priority = other.priority;
       this.userID = other.userID;
+      this.keysInterner = other.keysInterner;
+      this.valuesInterner = other.valuesInterner;
       if (copyProperties != null) {
          this.properties = new TypedProperties(copyProperties);
       }
@@ -464,7 +478,8 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
       if (properties == null) {
          TypedProperties properties = new TypedProperties();
          if (buffer != null && propertiesLocation >= 0) {
-            
properties.decode(buffer.duplicate().readerIndex(propertiesLocation));
+            final ByteBuf byteBuf = 
buffer.duplicate().readerIndex(propertiesLocation);
+            properties.decode(byteBuf, keysInterner, valuesInterner);
          }
          this.properties = properties;
       }
@@ -528,8 +543,17 @@ public class CoreMessage extends RefCountMessage 
implements ICoreMessage {
    private void decodeHeadersAndProperties(final ByteBuf buffer, boolean 
lazyProperties) {
       messageIDPosition = buffer.readerIndex();
       messageID = buffer.readLong();
-
-      address = SimpleString.readNullableSimpleString(buffer);
+      int b = buffer.readByte();
+      if (b != DataConstants.NULL) {
+         final int length = buffer.readInt();
+         if (keysInterner != null) {
+            address = keysInterner.intern(buffer, length);
+         } else {
+            address = SimpleString.readSimpleString(buffer, length);
+         }
+      } else {
+         address = null;
+      }
       if (buffer.readByte() == DataConstants.NOT_NULL) {
          byte[] bytes = new byte[16];
          buffer.readBytes(bytes);
@@ -547,7 +571,7 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
          propertiesLocation = buffer.readerIndex();
       } else {
          properties = new TypedProperties();
-         properties.decode(buffer);
+         properties.decode(buffer, keysInterner, valuesInterner);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
index 1022030..787e499 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
@@ -34,6 +34,10 @@ public class ClientPacketDecoder extends PacketDecoder {
    private static final long serialVersionUID = 6952614096979334582L;
    public static final ClientPacketDecoder INSTANCE = new 
ClientPacketDecoder();
 
+   protected ClientPacketDecoder() {
+
+   }
+
    @Override
    public Packet decode(final ActiveMQBuffer in, CoreRemotingConnection 
connection) {
       final byte packetType = in.readByte();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
index 93432b8..f0005ff 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
@@ -409,7 +409,7 @@ public class ActiveMQClientProtocolManager implements 
ClientProtocolManager {
                                      List<Interceptor> incomingInterceptors,
                                      List<Interceptor> outgoingInterceptors,
                                      TopologyResponseHandler 
topologyResponseHandler) {
-      this.connection = new RemotingConnectionImpl(getPacketDecoder(), 
transportConnection, callTimeout, callFailoverTimeout, incomingInterceptors, 
outgoingInterceptors);
+      this.connection = new RemotingConnectionImpl(createPacketDecoder(), 
transportConnection, callTimeout, callFailoverTimeout, incomingInterceptors, 
outgoingInterceptors);
 
       this.topologyResponseHandler = topologyResponseHandler;
 
@@ -510,7 +510,7 @@ public class ActiveMQClientProtocolManager implements 
ClientProtocolManager {
       }
    }
 
-   protected PacketDecoder getPacketDecoder() {
+   protected PacketDecoder createPacketDecoder() {
       return ClientPacketDecoder.INSTANCE;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
index 0584476..2276fdb 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.core.protocol;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
@@ -53,6 +54,7 @@ import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReq
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
+import org.apache.activemq.artemis.utils.collections.TypedProperties;
 
 import static 
org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST;
 import static 
org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE;
@@ -83,16 +85,34 @@ import static 
org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES
 
 public class ServerPacketDecoder extends ClientPacketDecoder {
 
+   private static final int UUID_LENGTH = 36;
+   private static final int DEFAULT_INTERNER_CAPACITY = 32;
    private static final long serialVersionUID = 3348673114388400766L;
-   public static final ServerPacketDecoder INSTANCE = new 
ServerPacketDecoder();
+   private SimpleString.Interner keysInterner;
+   private TypedProperties.StringValue.Interner valuesInterner;
 
-   private static SessionSendMessage decodeSessionSendMessage(final 
ActiveMQBuffer in, CoreRemotingConnection connection) {
+   public ServerPacketDecoder() {
+      this.keysInterner = null;
+      this.valuesInterner = null;
+   }
+
+   private void initializeInternersIfNeeded() {
+      if (this.keysInterner == null) {
+         this.keysInterner = new 
SimpleString.Interner(DEFAULT_INTERNER_CAPACITY, UUID_LENGTH);
+      }
+      if (this.valuesInterner == null) {
+         this.valuesInterner = new 
TypedProperties.StringValue.Interner(DEFAULT_INTERNER_CAPACITY, UUID_LENGTH);
+      }
+   }
+
+   private SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer 
in, CoreRemotingConnection connection) {
       final SessionSendMessage sendMessage;
 
+      initializeInternersIfNeeded();
       if (connection.isVersionBeforeAddressChange()) {
-         sendMessage = new SessionSendMessage_1X(new CoreMessage());
+         sendMessage = new SessionSendMessage_1X(new 
CoreMessage(this.keysInterner, this.valuesInterner));
       } else {
-         sendMessage = new SessionSendMessage(new CoreMessage());
+         sendMessage = new SessionSendMessage(new 
CoreMessage(this.keysInterner, this.valuesInterner));
       }
 
       sendMessage.decode(in);
@@ -259,5 +279,4 @@ public class ServerPacketDecoder extends 
ClientPacketDecoder {
 
       return packet;
    }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
index c9262fa..af9e131 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
@@ -116,7 +116,7 @@ public class CoreProtocolManager implements 
ProtocolManager<Interceptor> {
 
       Executor connectionExecutor = server.getExecutorFactory().getExecutor();
 
-      final CoreRemotingConnection rc = new 
RemotingConnectionImpl(ServerPacketDecoder.INSTANCE, connection, 
incomingInterceptors, outgoingInterceptors, 
config.isAsyncConnectionExecutionEnabled() ? connectionExecutor : null, 
server.getNodeID());
+      final CoreRemotingConnection rc = new RemotingConnectionImpl(new 
ServerPacketDecoder(), connection, incomingInterceptors, outgoingInterceptors, 
config.isAsyncConnectionExecutionEnabled() ? connectionExecutor : null, 
server.getNodeID());
 
       Channel channel1 = rc.getChannel(CHANNEL_ID.SESSION.id, -1);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d776edd/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java
index 85ad3a3..209f68f 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java
@@ -65,8 +65,8 @@ public class ActiveMQServerSideProtocolManagerFactory 
implements ClientProtocolM
    class ActiveMQReplicationProtocolManager extends 
ActiveMQClientProtocolManager {
 
       @Override
-      protected PacketDecoder getPacketDecoder() {
-         return ServerPacketDecoder.INSTANCE;
+      protected PacketDecoder createPacketDecoder() {
+         return new ServerPacketDecoder();
       }
    }
 }

Reply via email to