ARTEMIS-1586 Refactor to make more generic * Move byte util code into ByteUtil * Re-use the new equals method in SimpleString * Apply same pools/interners to client decode * Create String to SimpleString pools/interners for property access via String keys (producer and consumer benefits) * Lazy init the pools on withing the get methods of CoreMessageObjectPools to get the specific pool, to avoid having this scattered every where. * reduce SimpleString creation in conversion to/from core message methods with JMS wrapper. * reduce SimpleString creation in conversion to/from Core in OpenWire, AMQP, MQTT.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/98028cde Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/98028cde Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/98028cde Branch: refs/heads/master Commit: 98028cdecc6bd31c86a7d6decfed1961e46be7b2 Parents: 8d776ed Author: Michael André Pearce <[email protected]> Authored: Wed Jan 10 08:48:14 2018 +0000 Committer: Michael Pearce <[email protected]> Committed: Wed Jan 17 09:33:41 2018 +0100 ---------------------------------------------------------------------- .../activemq/artemis/api/core/SimpleString.java | 237 ++++++++----------- .../artemis/utils/AbstractByteBufPool.java | 164 +++++++++++++ .../artemis/utils/AbstractInterner.java | 157 ------------ .../activemq/artemis/utils/AbstractPool.java | 89 +++++++ .../apache/activemq/artemis/utils/ByteUtil.java | 122 ++++++++++ .../utils/collections/TypedProperties.java | 122 +++++++--- .../activemq/artemis/api/core/Message.java | 6 + .../core/client/impl/ClientMessageImpl.java | 23 +- .../core/client/impl/ClientSessionImpl.java | 5 +- .../artemis/core/message/impl/CoreMessage.java | 107 +++++---- .../message/impl/CoreMessageObjectPools.java | 55 +++++ .../core/protocol/ClientPacketDecoder.java | 11 +- .../impl/ActiveMQClientProtocolManager.java | 2 +- .../activemq/artemis/reader/MessageUtil.java | 10 +- .../artemis/message/CoreMessageTest.java | 2 +- .../artemis/jms/client/ActiveMQDestination.java | 20 +- .../artemis/jms/client/ActiveMQMessage.java | 55 ++--- .../artemis/jms/client/ActiveMQQueue.java | 8 +- .../artemis/jms/client/ActiveMQSession.java | 6 +- .../jms/client/ActiveMQStreamMessage.java | 2 +- .../artemis/jms/client/ActiveMQTopic.java | 8 +- .../protocol/amqp/broker/AMQPMessage.java | 74 ++++-- .../amqp/broker/AMQPSessionCallback.java | 105 ++++---- .../protocol/amqp/converter/AMQPConverter.java | 5 +- .../amqp/converter/AMQPMessageSupport.java | 49 ++-- .../amqp/converter/AmqpCoreConverter.java | 37 +-- .../proton/ProtonServerReceiverContext.java | 16 +- .../amqp/proton/ProtonServerSenderContext.java | 54 ++--- .../JMSMappingOutboundTransformerTest.java | 4 +- .../artemis/core/protocol/mqtt/MQTTSession.java | 8 +- .../core/protocol/mqtt/MQTTSessionCallback.java | 2 +- .../artemis/core/protocol/mqtt/MQTTUtil.java | 19 +- .../protocol/openwire/OpenWireConnection.java | 2 +- .../openwire/OpenWireMessageConverter.java | 8 +- .../core/protocol/openwire/OpenwireMessage.java | 11 + .../core/protocol/openwire/amq/AMQConsumer.java | 2 +- .../core/protocol/openwire/amq/AMQSession.java | 11 +- .../core/protocol/stomp/StompSession.java | 2 +- .../ra/inflow/ActiveMQMessageHandler.java | 2 +- .../core/protocol/ServerPacketDecoder.java | 25 +- .../protocol/core/impl/CoreSessionCallback.java | 11 +- .../core/server/impl/ServerConsumerImpl.java | 2 +- .../spi/core/protocol/MessageConverter.java | 3 +- .../spi/core/protocol/SessionCallback.java | 2 +- .../impl/ScheduledDeliveryHandlerTest.java | 11 + .../integration/client/AcknowledgeTest.java | 11 + .../integration/client/HangConsumerTest.java | 2 +- .../persistence/XmlImportExportTest.java | 2 +- 48 files changed, 1048 insertions(+), 643 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java index e24e245..dbf7468 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java @@ -21,8 +21,9 @@ import java.util.ArrayList; import java.util.List; import io.netty.buffer.ByteBuf; -import io.netty.util.internal.PlatformDependent; -import org.apache.activemq.artemis.utils.AbstractInterner; +import org.apache.activemq.artemis.utils.AbstractByteBufPool; +import org.apache.activemq.artemis.utils.AbstractPool; +import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.DataConstants; /** @@ -33,129 +34,6 @@ import org.apache.activemq.artemis.utils.DataConstants; */ public final class SimpleString implements CharSequence, Serializable, Comparable<SimpleString> { - public static final class Interner extends AbstractInterner<SimpleString> { - - private final int maxLength; - - public Interner(final int capacity, final int maxCharsLength) { - super(capacity); - this.maxLength = maxCharsLength; - } - - @Override - protected boolean isEqual(final SimpleString entry, final ByteBuf byteBuf, final int offset, final int length) { - return SimpleString.isEqual(entry, byteBuf, offset, length); - } - - @Override - protected boolean canIntern(final ByteBuf byteBuf, final int length) { - assert length % 2 == 0 : "length must be a multiple of 2"; - final int expectedStringLength = length >> 1; - return expectedStringLength <= maxLength; - } - - @Override - protected SimpleString create(final ByteBuf byteBuf, final int length) { - return readSimpleString(byteBuf, length); - } - } - - /** - * Returns {@code true} if the {@link SimpleString} encoded content into {@code bytes} is equals to {@code s}, - * {@code false} otherwise. - * <p> - * It assumes that the {@code bytes} content is read using {@link SimpleString#readSimpleString(ByteBuf, int)} ie starting right after the - * length field. - */ - public static boolean isEqual(final SimpleString s, final ByteBuf bytes, final int offset, final int length) { - if (s == null) { - return false; - } - final byte[] chars = s.getData(); - if (chars.length != length) - return false; - if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) { - if ((offset + length) > bytes.writerIndex()) { - throw new IndexOutOfBoundsException(); - } - if (bytes.hasArray()) { - return batchOnHeapIsEqual(chars, bytes.array(), bytes.arrayOffset() + offset, length); - } else if (bytes.hasMemoryAddress()) { - return batchOffHeapIsEqual(chars, bytes.memoryAddress(), offset, length); - } - } - return byteBufIsEqual(chars, bytes, offset, length); - } - - private static boolean byteBufIsEqual(final byte[] chars, final ByteBuf bytes, final int offset, final int length) { - for (int i = 0; i < length; i++) - if (chars[i] != bytes.getByte(offset + i)) - return false; - return true; - } - - private static boolean batchOnHeapIsEqual(final byte[] chars, - final byte[] array, - final int arrayOffset, - final int length) { - final int longCount = length >>> 3; - final int bytesCount = length & 7; - int bytesIndex = arrayOffset; - int charsIndex = 0; - for (int i = 0; i < longCount; i++) { - final long charsLong = PlatformDependent.getLong(chars, charsIndex); - final long bytesLong = PlatformDependent.getLong(array, bytesIndex); - if (charsLong != bytesLong) { - return false; - - } - bytesIndex += 8; - charsIndex += 8; - } - for (int i = 0; i < bytesCount; i++) { - final byte charsByte = PlatformDependent.getByte(chars, charsIndex); - final byte bytesByte = PlatformDependent.getByte(array, bytesIndex); - if (charsByte != bytesByte) { - return false; - - } - bytesIndex++; - charsIndex++; - } - return true; - } - - private static boolean batchOffHeapIsEqual(final byte[] chars, - final long address, - final int offset, - final int length) { - final int longCount = length >>> 3; - final int bytesCount = length & 7; - long bytesAddress = address + offset; - int charsIndex = 0; - for (int i = 0; i < longCount; i++) { - final long charsLong = PlatformDependent.getLong(chars, charsIndex); - final long bytesLong = PlatformDependent.getLong(bytesAddress); - if (charsLong != bytesLong) { - return false; - - } - bytesAddress += 8; - charsIndex += 8; - } - for (int i = 0; i < bytesCount; i++) { - final byte charsByte = PlatformDependent.getByte(chars, charsIndex); - final byte bytesByte = PlatformDependent.getByte(bytesAddress); - if (charsByte != bytesByte) { - return false; - - } - bytesAddress++; - charsIndex++; - } - return true; - } - private static final long serialVersionUID = 4204223851422244307L; // Attributes @@ -185,6 +63,13 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl return new SimpleString(string); } + public static SimpleString toSimpleString(final String string, StringSimpleStringPool pool) { + if (pool == null) { + return toSimpleString(string); + } + return pool.getOrCreate(string); + } + // Constructors // ---------------------------------------------------------------------- @@ -236,6 +121,10 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl data[1] = high; } + public boolean isEmpty() { + return data.length == 0; + } + // CharSequence implementation // --------------------------------------------------------------------------- @@ -267,11 +156,26 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl return readSimpleString(buffer); } + public static SimpleString readNullableSimpleString(ByteBuf buffer, ByteBufSimpleStringPool pool) { + int b = buffer.readByte(); + if (b == DataConstants.NULL) { + return null; + } + return readSimpleString(buffer, pool); + } + public static SimpleString readSimpleString(ByteBuf buffer) { int len = buffer.readInt(); return readSimpleString(buffer, len); } + public static SimpleString readSimpleString(ByteBuf buffer, ByteBufSimpleStringPool pool) { + if (pool == null) { + return readSimpleString(buffer); + } + return pool.getOrCreate(buffer); + } + public static SimpleString readSimpleString(final ByteBuf buffer, final int length) { byte[] data = new byte[length]; buffer.readBytes(data); @@ -381,22 +285,23 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl if (other instanceof SimpleString) { SimpleString s = (SimpleString) other; - if (data.length != s.data.length) { - return false; - } - - for (int i = 0; i < data.length; i++) { - if (data[i] != s.data[i]) { - return false; - } - } - - return true; + return ByteUtil.equals(data, s.data); } else { return false; } } + /** + * Returns {@code true} if the {@link SimpleString} encoded content into {@code bytes} is equals to {@code s}, + * {@code false} otherwise. + * <p> + * It assumes that the {@code bytes} content is read using {@link SimpleString#readSimpleString(ByteBuf, int)} ie starting right after the + * length field. + */ + public boolean equals(final ByteBuf byteBuf, final int offset, final int length) { + return ByteUtil.equals(data, byteBuf, offset, length); + } + @Override public int hashCode() { if (hash == 0) { @@ -575,4 +480,64 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl dst[d++] = (char) (low | high); } } + + public static final class ByteBufSimpleStringPool extends AbstractByteBufPool<SimpleString> { + + private static final int UUID_LENGTH = 36; + + private final int maxLength; + + public ByteBufSimpleStringPool() { + this.maxLength = UUID_LENGTH; + } + + public ByteBufSimpleStringPool(final int capacity, final int maxCharsLength) { + super(capacity); + this.maxLength = maxCharsLength; + } + + @Override + protected boolean isEqual(final SimpleString entry, final ByteBuf byteBuf, final int offset, final int length) { + if (entry == null) { + return false; + } + return entry.equals(byteBuf, offset, length); + } + + @Override + protected boolean canPool(final ByteBuf byteBuf, final int length) { + assert length % 2 == 0 : "length must be a multiple of 2"; + final int expectedStringLength = length >> 1; + return expectedStringLength <= maxLength; + } + + @Override + protected SimpleString create(final ByteBuf byteBuf, final int length) { + return readSimpleString(byteBuf, length); + } + } + + public static final class StringSimpleStringPool extends AbstractPool<String, SimpleString> { + + public StringSimpleStringPool() { + super(); + } + + public StringSimpleStringPool(final int capacity) { + super(capacity); + } + + @Override + protected SimpleString create(String value) { + return toSimpleString(value); + } + + @Override + protected boolean isEqual(SimpleString entry, String value) { + if (entry == null) { + return false; + } + return entry.toString().equals(value); + } + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractByteBufPool.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractByteBufPool.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractByteBufPool.java new file mode 100644 index 0000000..87c1b6f --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractByteBufPool.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils; + +import io.netty.buffer.ByteBuf; +import io.netty.util.internal.MathUtil; +import io.netty.util.internal.PlatformDependent; + +/** + * Thread-safe {@code <T>} interner. + * <p> + * Differently from {@link String#intern()} it contains a fixed amount of entries and + * when used by concurrent threads it doesn't ensure the uniqueness of the entries ie + * the same entry could be allocated multiple times by concurrent calls. + */ +public abstract class AbstractByteBufPool<T> { + + public static final int DEFAULT_POOL_CAPACITY = 32; + + private final T[] entries; + private final int mask; + private final int shift; + + public AbstractByteBufPool() { + this(DEFAULT_POOL_CAPACITY); + } + + public AbstractByteBufPool(final int capacity) { + entries = (T[]) new Object[MathUtil.findNextPositivePowerOfTwo(capacity)]; + mask = entries.length - 1; + //log2 of entries.length + shift = 31 - Integer.numberOfLeadingZeros(entries.length); + } + + /** + * Batch hash code implementation that works at its best if {@code bytes} + * contains a {@link org.apache.activemq.artemis.api.core.SimpleString} encoded. + */ + private static int hashCode(final ByteBuf bytes, final int offset, final int length) { + if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) { + //if the platform allows it, the hash code could be computed without bounds checking + if (bytes.hasArray()) { + return onHeapHashCode(bytes.array(), bytes.arrayOffset() + offset, length); + } else if (bytes.hasMemoryAddress()) { + return offHeapHashCode(bytes.memoryAddress(), offset, length); + } + } + return byteBufHashCode(bytes, offset, length); + } + + private static int onHeapHashCode(final byte[] bytes, final int offset, final int length) { + final int intCount = length >>> 1; + final int byteCount = length & 1; + int hashCode = 1; + int arrayIndex = offset; + for (int i = 0; i < intCount; i++) { + hashCode = 31 * hashCode + PlatformDependent.getShort(bytes, arrayIndex); + arrayIndex += 2; + } + for (int i = 0; i < byteCount; i++) { + hashCode = 31 * hashCode + PlatformDependent.getByte(bytes, arrayIndex++); + } + return hashCode; + } + + private static int offHeapHashCode(final long address, final int offset, final int length) { + final int intCount = length >>> 1; + final int byteCount = length & 1; + int hashCode = 1; + int arrayIndex = offset; + for (int i = 0; i < intCount; i++) { + hashCode = 31 * hashCode + PlatformDependent.getShort(address + arrayIndex); + arrayIndex += 2; + } + for (int i = 0; i < byteCount; i++) { + hashCode = 31 * hashCode + PlatformDependent.getByte(address + arrayIndex++); + } + return hashCode; + } + + private static int byteBufHashCode(final ByteBuf byteBuf, final int offset, final int length) { + final int intCount = length >>> 1; + final int byteCount = length & 1; + int hashCode = 1; + int arrayIndex = offset; + for (int i = 0; i < intCount; i++) { + final short shortLE = byteBuf.getShortLE(arrayIndex); + final short nativeShort = PlatformDependent.BIG_ENDIAN_NATIVE_ORDER ? Short.reverseBytes(shortLE) : shortLE; + hashCode = 31 * hashCode + nativeShort; + arrayIndex += 2; + } + for (int i = 0; i < byteCount; i++) { + hashCode = 31 * hashCode + byteBuf.getByte(arrayIndex++); + } + return hashCode; + } + + /** + * Returns {@code true} if {@code length}'s {@code byteBuf} content from {@link ByteBuf#readerIndex()} can be pooled, + * {@code false} otherwise. + */ + protected abstract boolean canPool(ByteBuf byteBuf, int length); + + /** + * Create a new entry. + */ + protected abstract T create(ByteBuf byteBuf, int length); + + /** + * Returns {@code true} if the {@code entry} content is the same of {@code byteBuf} at the specified {@code offset} + * and {@code length} {@code false} otherwise. + */ + protected abstract boolean isEqual(T entry, ByteBuf byteBuf, int offset, int length); + + /** + * Returns a pooled entry if possible, a new one otherwise. + * <p> + * The {@code byteBuf}'s {@link ByteBuf#readerIndex()} is incremented by {@code length} after it. + */ + public final T getOrCreate(final ByteBuf byteBuf) { + final int length = byteBuf.readInt(); + if (!canPool(byteBuf, length)) { + return create(byteBuf, length); + } else { + if (!byteBuf.isReadable(length)) { + throw new IndexOutOfBoundsException(); + } + final int bytesOffset = byteBuf.readerIndex(); + final int hashCode = hashCode(byteBuf, bytesOffset, length); + //fast % operation with power of 2 entries.length + final int firstIndex = hashCode & mask; + final T firstEntry = entries[firstIndex]; + if (isEqual(firstEntry, byteBuf, bytesOffset, length)) { + byteBuf.skipBytes(length); + return firstEntry; + } + final int secondIndex = (hashCode >> shift) & mask; + final T secondEntry = entries[secondIndex]; + if (isEqual(secondEntry, byteBuf, bytesOffset, length)) { + byteBuf.skipBytes(length); + return secondEntry; + } + final T internedEntry = create(byteBuf, length); + final int entryIndex = firstEntry == null ? firstIndex : secondIndex; + entries[entryIndex] = internedEntry; + return internedEntry; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java deleted file mode 100644 index 7e1fe40..0000000 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java +++ /dev/null @@ -1,157 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.artemis.utils; - -import io.netty.buffer.ByteBuf; -import io.netty.util.internal.MathUtil; -import io.netty.util.internal.PlatformDependent; - -/** - * Thread-safe {@code <T>} interner. - * <p> - * Differently from {@link String#intern()} it contains a fixed amount of entries and - * when used by concurrent threads it doesn't ensure the uniqueness of the entries ie - * the same entry could be allocated multiple times by concurrent calls. - */ -public abstract class AbstractInterner<T> { - - private final T[] entries; - private final int mask; - private final int shift; - - public AbstractInterner(final int capacity) { - entries = (T[]) new Object[MathUtil.findNextPositivePowerOfTwo(capacity)]; - mask = entries.length - 1; - //log2 of entries.length - shift = 31 - Integer.numberOfLeadingZeros(entries.length); - } - - /** - * Batch hash code implementation that works at its best if {@code bytes} - * contains a {@link org.apache.activemq.artemis.api.core.SimpleString} encoded. - */ - private static int hashCode(final ByteBuf bytes, final int offset, final int length) { - if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) { - //if the platform allows it, the hash code could be computed without bounds checking - if (bytes.hasArray()) { - return onHeapHashCode(bytes.array(), bytes.arrayOffset() + offset, length); - } else if (bytes.hasMemoryAddress()) { - return offHeapHashCode(bytes.memoryAddress(), offset, length); - } - } - return byteBufHashCode(bytes, offset, length); - } - - private static int onHeapHashCode(final byte[] bytes, final int offset, final int length) { - final int intCount = length >>> 1; - final int byteCount = length & 1; - int hashCode = 1; - int arrayIndex = offset; - for (int i = 0; i < intCount; i++) { - hashCode = 31 * hashCode + PlatformDependent.getShort(bytes, arrayIndex); - arrayIndex += 2; - } - for (int i = 0; i < byteCount; i++) { - hashCode = 31 * hashCode + PlatformDependent.getByte(bytes, arrayIndex++); - } - return hashCode; - } - - private static int offHeapHashCode(final long address, final int offset, final int length) { - final int intCount = length >>> 1; - final int byteCount = length & 1; - int hashCode = 1; - int arrayIndex = offset; - for (int i = 0; i < intCount; i++) { - hashCode = 31 * hashCode + PlatformDependent.getShort(address + arrayIndex); - arrayIndex += 2; - } - for (int i = 0; i < byteCount; i++) { - hashCode = 31 * hashCode + PlatformDependent.getByte(address + arrayIndex++); - } - return hashCode; - } - - private static int byteBufHashCode(final ByteBuf byteBuf, final int offset, final int length) { - final int intCount = length >>> 1; - final int byteCount = length & 1; - int hashCode = 1; - int arrayIndex = offset; - for (int i = 0; i < intCount; i++) { - final short shortLE = byteBuf.getShortLE(arrayIndex); - final short nativeShort = PlatformDependent.BIG_ENDIAN_NATIVE_ORDER ? Short.reverseBytes(shortLE) : shortLE; - hashCode = 31 * hashCode + nativeShort; - arrayIndex += 2; - } - for (int i = 0; i < byteCount; i++) { - hashCode = 31 * hashCode + byteBuf.getByte(arrayIndex++); - } - return hashCode; - } - - /** - * Returns {@code true} if {@code length}'s {@code byteBuf} content from {@link ByteBuf#readerIndex()} can be interned, - * {@code false} otherwise. - */ - protected abstract boolean canIntern(ByteBuf byteBuf, int length); - - /** - * Create a new entry. - */ - protected abstract T create(ByteBuf byteBuf, int length); - - /** - * Returns {@code true} if the {@code entry} content is the same of {@code byteBuf} at the specified {@code offset} - * and {@code length} {@code false} otherwise. - */ - protected abstract boolean isEqual(T entry, ByteBuf byteBuf, int offset, int length); - - /** - * Returns and interned entry if possible, a new one otherwise. - * <p> - * The {@code byteBuf}'s {@link ByteBuf#readerIndex()} is incremented by {@code length} after it. - */ - public final T intern(final ByteBuf byteBuf, final int length) { - if (!canIntern(byteBuf, length)) { - return create(byteBuf, length); - } else { - if (!byteBuf.isReadable(length)) { - throw new IndexOutOfBoundsException(); - } - final int bytesOffset = byteBuf.readerIndex(); - final int hashCode = hashCode(byteBuf, bytesOffset, length); - //fast % operation with power of 2 entries.length - final int firstIndex = hashCode & mask; - final T firstEntry = entries[firstIndex]; - if (isEqual(firstEntry, byteBuf, bytesOffset, length)) { - byteBuf.skipBytes(length); - return firstEntry; - } - final int secondIndex = (hashCode >> shift) & mask; - final T secondEntry = entries[secondIndex]; - if (isEqual(secondEntry, byteBuf, bytesOffset, length)) { - byteBuf.skipBytes(length); - return secondEntry; - } - final T internedEntry = create(byteBuf, length); - final int entryIndex = firstEntry == null ? firstIndex : secondIndex; - entries[entryIndex] = internedEntry; - return internedEntry; - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractPool.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractPool.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractPool.java new file mode 100644 index 0000000..cc42e8f --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractPool.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils; + +import io.netty.buffer.ByteBuf; +import io.netty.util.internal.MathUtil; + +/** + * Thread-safe {@code <T>} interner. + * <p> + * Differently from {@link String#intern()} it contains a fixed amount of entries and + * when used by concurrent threads it doesn't ensure the uniqueness of the entries ie + * the same entry could be allocated multiple times by concurrent calls. + */ +public abstract class AbstractPool<I, O> { + + public static final int DEFAULT_POOL_CAPACITY = 32; + + private final O[] entries; + private final int mask; + private final int shift; + + public AbstractPool() { + this(DEFAULT_POOL_CAPACITY); + } + + public AbstractPool(final int capacity) { + entries = (O[]) new Object[MathUtil.findNextPositivePowerOfTwo(capacity)]; + mask = entries.length - 1; + //log2 of entries.length + shift = 31 - Integer.numberOfLeadingZeros(entries.length); + } + + /** + * Create a new entry. + */ + protected abstract O create(I value); + + /** + * Returns {@code true} if the {@code entry} content is equal to {@code value}; + */ + protected abstract boolean isEqual(O entry, I value); + + protected int hashCode(I value) { + return value.hashCode(); + } + + /** + * Returns and interned entry if possible, a new one otherwise. + * <p> + * The {@code byteBuf}'s {@link ByteBuf#readerIndex()} is incremented by {@code length} after it. + */ + public final O getOrCreate(final I value) { + if (value == null) { + return null; + } + final int hashCode = hashCode(value); + //fast % operation with power of 2 entries.length + final int firstIndex = hashCode & mask; + final O firstEntry = entries[firstIndex]; + if (isEqual(firstEntry, value)) { + return firstEntry; + } + final int secondIndex = (hashCode >> shift) & mask; + final O secondEntry = entries[secondIndex]; + if (isEqual(secondEntry, value)) { + return secondEntry; + } + final O internedEntry = create(value); + final int entryIndex = firstEntry == null ? firstIndex : secondIndex; + entries[entryIndex] = internedEntry; + return internedEntry; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java index e70891d..8835797 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java @@ -22,6 +22,7 @@ import java.util.regex.Pattern; import io.netty.buffer.ByteBuf; import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.util.internal.PlatformDependent; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.logs.ActiveMQUtilBundle; @@ -207,4 +208,125 @@ public class ByteUtil { throw ActiveMQUtilBundle.BUNDLE.failedToParseLong(text); } } + + public static boolean equals(final byte[] left, final byte[] right) { + return equals(left, right, 0, right.length); + } + + public static boolean equals(final byte[] left, + final byte[] right, + final int rightOffset, + final int rightLength) { + if (left == right) + return true; + if (left == null || right == null) + return false; + if (left.length != rightLength) + return false; + if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) { + return equalsUnsafe(left, right, rightOffset, rightLength); + } else { + return equalsSafe(left, right, rightOffset, rightLength); + } + } + + private static boolean equalsSafe(byte[] left, byte[] right, int rightOffset, int rightLength) { + for (int i = 0; i < rightLength; i++) + if (left[i] != right[rightOffset + i]) + return false; + return true; + } + + private static boolean equalsUnsafe(final byte[] left, + final byte[] right, + final int rightOffset, + final int rightLength) { + final int longCount = rightLength >>> 3; + final int bytesCount = rightLength & 7; + int bytesIndex = rightOffset; + int charsIndex = 0; + for (int i = 0; i < longCount; i++) { + final long charsLong = PlatformDependent.getLong(left, charsIndex); + final long bytesLong = PlatformDependent.getLong(right, bytesIndex); + if (charsLong != bytesLong) { + return false; + } + bytesIndex += 8; + charsIndex += 8; + } + for (int i = 0; i < bytesCount; i++) { + final byte charsByte = PlatformDependent.getByte(left, charsIndex); + final byte bytesByte = PlatformDependent.getByte(right, bytesIndex); + if (charsByte != bytesByte) { + return false; + } + bytesIndex++; + charsIndex++; + } + return true; + } + + + /** + * Returns {@code true} if the {@link SimpleString} encoded content into {@code bytes} is equals to {@code s}, + * {@code false} otherwise. + * <p> + * It assumes that the {@code bytes} content is read using {@link SimpleString#readSimpleString(ByteBuf, int)} ie starting right after the + * length field. + */ + public static boolean equals(final byte[] bytes, final ByteBuf byteBuf, final int offset, final int length) { + if (bytes.length != length) + return false; + if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) { + if ((offset + length) > byteBuf.writerIndex()) { + throw new IndexOutOfBoundsException(); + } + if (byteBuf.hasArray()) { + return equals(bytes, byteBuf.array(), byteBuf.arrayOffset() + offset, length); + } else if (byteBuf.hasMemoryAddress()) { + return equalsOffHeap(bytes, byteBuf.memoryAddress(), offset, length); + } + } + return equalsOnHeap(bytes, byteBuf, offset, length); + } + + private static boolean equalsOnHeap(final byte[] bytes, final ByteBuf byteBuf, final int offset, final int length) { + if (bytes.length != length) + return false; + for (int i = 0; i < length; i++) + if (bytes[i] != byteBuf.getByte(offset + i)) + return false; + return true; + } + + private static boolean equalsOffHeap(final byte[] bytes, + final long address, + final int offset, + final int length) { + final int longCount = length >>> 3; + final int bytesCount = length & 7; + long bytesAddress = address + offset; + int charsIndex = 0; + for (int i = 0; i < longCount; i++) { + final long charsLong = PlatformDependent.getLong(bytes, charsIndex); + final long bytesLong = PlatformDependent.getLong(bytesAddress); + if (charsLong != bytesLong) { + return false; + + } + bytesAddress += 8; + charsIndex += 8; + } + for (int i = 0; i < bytesCount; i++) { + final byte charsByte = PlatformDependent.getByte(bytes, charsIndex); + final byte bytesByte = PlatformDependent.getByte(bytesAddress); + if (charsByte != bytesByte) { + return false; + + } + bytesAddress++; + charsIndex++; + } + return true; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java index a3e4876..56beb76 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java @@ -28,7 +28,7 @@ import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.logs.ActiveMQUtilBundle; -import org.apache.activemq.artemis.utils.AbstractInterner; +import org.apache.activemq.artemis.utils.AbstractByteBufPool; import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.DataConstants; @@ -332,8 +332,7 @@ public class TypedProperties { } public synchronized void decode(final ByteBuf buffer, - final SimpleString.Interner keyInterner, - final StringValue.Interner valueInterner) { + final TypedPropertiesDecoderPools keyValuePools) { byte b = buffer.readByte(); if (b == DataConstants.NULL) { @@ -346,15 +345,7 @@ public class TypedProperties { size = 0; for (int i = 0; i < numHeaders; i++) { - final SimpleString key; - int len = buffer.readInt(); - if (keyInterner != null) { - key = keyInterner.intern(buffer, len); - } else { - byte[] data = new byte[len]; - buffer.readBytes(data); - key = new SimpleString(data); - } + final SimpleString key = SimpleString.readSimpleString(buffer, keyValuePools == null ? null : keyValuePools.getPropertyKeysPool()); byte type = buffer.readByte(); @@ -412,12 +403,7 @@ public class TypedProperties { break; } case STRING: { - if (valueInterner != null) { - final int length = buffer.readInt(); - val = valueInterner.intern(buffer, length); - } else { - val = new StringValue(buffer); - } + val = StringValue.readStringValue(buffer, keyValuePools == null ? null : keyValuePools.getPropertyValuesPool()); doPutValue(key, val); break; } @@ -430,7 +416,7 @@ public class TypedProperties { } public synchronized void decode(final ByteBuf buffer) { - decode(buffer, null, null); + decode(buffer, null); } public synchronized void encode(final ByteBuf buffer) { @@ -901,25 +887,61 @@ public class TypedProperties { public static final class StringValue extends PropertyValue { - public static final class Interner extends AbstractInterner<StringValue> { + final SimpleString val; + + private StringValue(final SimpleString val) { + this.val = val; + } + + static StringValue readStringValue(final ByteBuf byteBuf, ByteBufStringValuePool pool) { + if (pool == null) { + return new StringValue(SimpleString.readSimpleString(byteBuf)); + } else { + return pool.getOrCreate(byteBuf); + } + } + + @Override + public Object getValue() { + return val; + } + + @Override + public void write(final ByteBuf buffer) { + buffer.writeByte(DataConstants.STRING); + SimpleString.writeSimpleString(buffer, val); + } + + @Override + public int encodeSize() { + return DataConstants.SIZE_BYTE + SimpleString.sizeofString(val); + } + + public static final class ByteBufStringValuePool extends AbstractByteBufPool<StringValue> { + + private static final int UUID_LENGTH = 36; private final int maxLength; - public Interner(final int capacity, final int maxCharsLength) { + public ByteBufStringValuePool() { + this.maxLength = UUID_LENGTH; + } + + public ByteBufStringValuePool(final int capacity, final int maxCharsLength) { super(capacity); this.maxLength = maxCharsLength; } @Override protected boolean isEqual(final StringValue entry, final ByteBuf byteBuf, final int offset, final int length) { - if (entry == null) { + if (entry == null || entry.val == null) { return false; } - return SimpleString.isEqual(entry.val, byteBuf, offset, length); + return entry.val.equals(byteBuf, offset, length); } @Override - protected boolean canIntern(final ByteBuf byteBuf, final int length) { + protected boolean canPool(final ByteBuf byteBuf, final int length) { assert length % 2 == 0 : "length must be a multiple of 2"; final int expectedStringLength = length >> 1; return expectedStringLength <= maxLength; @@ -930,31 +952,53 @@ public class TypedProperties { return new StringValue(SimpleString.readSimpleString(byteBuf, length)); } } + } - final SimpleString val; + public static class TypedPropertiesDecoderPools { - private StringValue(final SimpleString val) { - this.val = val; + private SimpleString.ByteBufSimpleStringPool propertyKeysPool; + private TypedProperties.StringValue.ByteBufStringValuePool propertyValuesPool; + + public TypedPropertiesDecoderPools() { + this.propertyKeysPool = new SimpleString.ByteBufSimpleStringPool(); + this.propertyValuesPool = new TypedProperties.StringValue.ByteBufStringValuePool(); } - private StringValue(final ByteBuf buffer) { - val = SimpleString.readSimpleString(buffer); + public TypedPropertiesDecoderPools(int keyPoolCapacity, int valuePoolCapacity, int maxCharsLength) { + this.propertyKeysPool = new SimpleString.ByteBufSimpleStringPool(keyPoolCapacity, maxCharsLength); + this.propertyValuesPool = new TypedProperties.StringValue.ByteBufStringValuePool(valuePoolCapacity, maxCharsLength); } - @Override - public Object getValue() { - return val; + public SimpleString.ByteBufSimpleStringPool getPropertyKeysPool() { + return propertyKeysPool; } - @Override - public void write(final ByteBuf buffer) { - buffer.writeByte(DataConstants.STRING); - SimpleString.writeSimpleString(buffer, val); + public TypedProperties.StringValue.ByteBufStringValuePool getPropertyValuesPool() { + return propertyValuesPool; } + } - @Override - public int encodeSize() { - return DataConstants.SIZE_BYTE + SimpleString.sizeofString(val); + public static class TypedPropertiesStringSimpleStringPools { + + private SimpleString.StringSimpleStringPool propertyKeysPool; + private SimpleString.StringSimpleStringPool propertyValuesPool; + + public TypedPropertiesStringSimpleStringPools() { + this.propertyKeysPool = new SimpleString.StringSimpleStringPool(); + this.propertyValuesPool = new SimpleString.StringSimpleStringPool(); + } + + public TypedPropertiesStringSimpleStringPools(int keyPoolCapacity, int valuePoolCapacity) { + this.propertyKeysPool = new SimpleString.StringSimpleStringPool(keyPoolCapacity); + this.propertyValuesPool = new SimpleString.StringSimpleStringPool(valuePoolCapacity); + } + + public SimpleString.StringSimpleStringPool getPropertyKeysPool() { + return propertyKeysPool; + } + + public SimpleString.StringSimpleStringPool getPropertyValuesPool() { + return propertyValuesPool; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index ddb8a3b..d24cd95 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Set; import io.netty.buffer.ByteBuf; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.persistence.Persister; /** @@ -587,6 +588,8 @@ public interface Message { Message putStringProperty(SimpleString key, SimpleString value); + Message putStringProperty(SimpleString key, String value); + /** * Returns the size of the <em>encoded</em> message. */ @@ -649,6 +652,9 @@ public interface Message { /** This should make you convert your message into Core format. */ ICoreMessage toCore(); + /** This should make you convert your message into Core format. */ + ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools); + int getMemoryEstimate(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java index 91fb6ca..8068aa9 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java @@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; import org.apache.activemq.artemis.core.message.LargeBodyEncoder; import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.utils.UUID; import org.apache.activemq.artemis.utils.collections.TypedProperties; @@ -59,6 +60,10 @@ public class ClientMessageImpl extends CoreMessage implements ClientMessageInter public ClientMessageImpl() { } + public ClientMessageImpl(CoreMessageObjectPools coreMessageObjectPools) { + super(coreMessageObjectPools); + } + protected ClientMessageImpl(ClientMessageImpl other) { super(other); } @@ -96,11 +101,22 @@ public class ClientMessageImpl extends CoreMessage implements ClientMessageInter final long expiration, final long timestamp, final byte priority, - final int initialMessageBufferSize) { + final int initialMessageBufferSize, + final CoreMessageObjectPools coreMessageObjectPools) { + super(coreMessageObjectPools); this.setType(type).setExpiration(expiration).setTimestamp(timestamp).setDurable(durable). setPriority(priority).initBuffer(initialMessageBufferSize); } + public ClientMessageImpl(final byte type, + final boolean durable, + final long expiration, + final long timestamp, + final byte priority, + final int initialMessageBufferSize) { + this(type, durable, expiration, timestamp, priority, initialMessageBufferSize, null); + } + @Override public TypedProperties getProperties() { return this.checkProperties(); @@ -286,6 +302,11 @@ public class ClientMessageImpl extends CoreMessage implements ClientMessageInter } @Override + public ClientMessageImpl putStringProperty(final SimpleString key, final String value) { + return (ClientMessageImpl) super.putStringProperty(key, value); + } + + @Override public ClientMessageImpl putObjectProperty(final SimpleString key, final Object value) throws ActiveMQPropertyConversionException { return (ClientMessageImpl) super.putObjectProperty(key, value); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index 61784ad..b5f8a1b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -43,6 +43,7 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; import org.apache.activemq.artemis.api.core.client.SessionFailureListener; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -148,6 +149,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi private final Executor closeExecutor; + private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools(); + ClientSessionImpl(final ClientSessionFactoryInternal sessionFactory, final String name, final String username, @@ -869,7 +872,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi final long expiration, final long timestamp, final byte priority) { - return new ClientMessageImpl(type, durable, expiration, timestamp, priority, initialMessagePacketSize); + return new ClientMessageImpl(type, durable, expiration, timestamp, priority, initialMessagePacketSize, coreMessageObjectPools); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index 4ebf97e..888b785 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -93,18 +93,14 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { protected volatile TypedProperties properties; - private final SimpleString.Interner keysInterner; - private final TypedProperties.StringValue.Interner valuesInterner; + private final CoreMessageObjectPools coreMessageObjectPools; - public CoreMessage(final SimpleString.Interner keysInterner, - final TypedProperties.StringValue.Interner valuesInterner) { - this.keysInterner = keysInterner; - this.valuesInterner = valuesInterner; + public CoreMessage(final CoreMessageObjectPools coreMessageObjectPools) { + this.coreMessageObjectPools = coreMessageObjectPools; } public CoreMessage() { - this.keysInterner = null; - this.valuesInterner = null; + this.coreMessageObjectPools = null; } /** On core there's no delivery annotation */ @@ -326,10 +322,13 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { } public CoreMessage(long id, int bufferSize) { + this(id, bufferSize, null); + } + + public CoreMessage(long id, int bufferSize, CoreMessageObjectPools coreMessageObjectPools) { this.initBuffer(bufferSize); this.setMessageID(id); - this.keysInterner = null; - this.valuesInterner = null; + this.coreMessageObjectPools = coreMessageObjectPools; } protected CoreMessage(CoreMessage other, TypedProperties copyProperties) { @@ -343,8 +342,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { this.timestamp = other.timestamp; this.priority = other.priority; this.userID = other.userID; - this.keysInterner = other.keysInterner; - this.valuesInterner = other.valuesInterner; + this.coreMessageObjectPools = other.coreMessageObjectPools; if (copyProperties != null) { this.properties = new TypedProperties(copyProperties); } @@ -424,7 +422,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public CoreMessage setValidatedUserID(String validatedUserID) { - putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUserID)); + putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUserID, getPropertyValuesPool())); return this; } @@ -479,7 +477,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { TypedProperties properties = new TypedProperties(); if (buffer != null && propertiesLocation >= 0) { final ByteBuf byteBuf = buffer.duplicate().readerIndex(propertiesLocation); - properties.decode(byteBuf, keysInterner, valuesInterner); + properties.decode(byteBuf, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools()); } this.properties = properties; } @@ -543,17 +541,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { private void decodeHeadersAndProperties(final ByteBuf buffer, boolean lazyProperties) { messageIDPosition = buffer.readerIndex(); messageID = buffer.readLong(); - int b = buffer.readByte(); - if (b != DataConstants.NULL) { - final int length = buffer.readInt(); - if (keysInterner != null) { - address = keysInterner.intern(buffer, length); - } else { - address = SimpleString.readSimpleString(buffer, length); - } - } else { - address = null; - } + + address = SimpleString.readNullableSimpleString(buffer, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressDecoderPool()); if (buffer.readByte() == DataConstants.NOT_NULL) { byte[] bytes = new byte[16]; buffer.readBytes(bytes); @@ -571,7 +560,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { propertiesLocation = buffer.readerIndex(); } else { properties = new TypedProperties(); - properties.decode(buffer, keysInterner, valuesInterner); + properties.decode(buffer, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools()); } } @@ -671,7 +660,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public CoreMessage setAddress(String address) { messageChanged(); - this.address = SimpleString.toSimpleString(address); + this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool()); return this; } @@ -703,7 +692,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { public CoreMessage putBooleanProperty(final String key, final boolean value) { messageChanged(); checkProperties(); - properties.putBooleanProperty(new SimpleString(key), value); + properties.putBooleanProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value); return this; } @@ -724,7 +713,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public Boolean getBooleanProperty(final String key) throws ActiveMQPropertyConversionException { checkProperties(); - return properties.getBooleanProperty(new SimpleString(key)); + return properties.getBooleanProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); } @Override @@ -739,7 +728,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { public CoreMessage putByteProperty(final String key, final byte value) { messageChanged(); checkProperties(); - properties.putByteProperty(new SimpleString(key), value); + properties.putByteProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value); return this; } @@ -752,7 +741,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public Byte getByteProperty(final String key) throws ActiveMQPropertyConversionException { - return getByteProperty(SimpleString.toSimpleString(key)); + return getByteProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); } @Override @@ -768,7 +757,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { public CoreMessage putBytesProperty(final String key, final byte[] value) { messageChanged(); checkProperties(); - properties.putBytesProperty(new SimpleString(key), value); + properties.putBytesProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value); return this; } @@ -780,7 +769,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public byte[] getBytesProperty(final String key) throws ActiveMQPropertyConversionException { - return getBytesProperty(new SimpleString(key)); + return getBytesProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); } @Override @@ -795,7 +784,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { public CoreMessage putCharProperty(String key, char value) { messageChanged(); checkProperties(); - properties.putCharProperty(new SimpleString(key), value); + properties.putCharProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value); return this; } @@ -811,7 +800,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { public CoreMessage putShortProperty(final String key, final short value) { messageChanged(); checkProperties(); - properties.putShortProperty(new SimpleString(key), value); + properties.putShortProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value); return this; } @@ -827,7 +816,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { public CoreMessage putIntProperty(final String key, final int value) { messageChanged(); checkProperties(); - properties.putIntProperty(new SimpleString(key), value); + properties.putIntProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value); return this; } @@ -854,7 +843,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { public CoreMessage putLongProperty(final String key, final long value) { messageChanged(); checkProperties(); - properties.putLongProperty(new SimpleString(key), value); + properties.putLongProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value); return this; } @@ -882,7 +871,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { public CoreMessage putFloatProperty(final String key, final float value) { messageChanged(); checkProperties(); - properties.putFloatProperty(new SimpleString(key), value); + properties.putFloatProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value); return this; } @@ -898,7 +887,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { public CoreMessage putDoubleProperty(final String key, final double value) { messageChanged(); checkProperties(); - properties.putDoubleProperty(new SimpleString(key), value); + properties.putDoubleProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value); return this; } @@ -924,10 +913,19 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { } @Override + public CoreMessage putStringProperty(final SimpleString key, final String value) { + messageChanged(); + checkProperties(); + properties.putSimpleStringProperty(key, SimpleString.toSimpleString(value, getPropertyValuesPool())); + return this; + } + + + @Override public CoreMessage putStringProperty(final String key, final String value) { messageChanged(); checkProperties(); - properties.putSimpleStringProperty(new SimpleString(key), SimpleString.toSimpleString(value)); + properties.putSimpleStringProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), SimpleString.toSimpleString(value, getPropertyValuesPool())); return this; } @@ -943,7 +941,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public Object getObjectProperty(final String key) { checkProperties(); - return getObjectProperty(SimpleString.toSimpleString(key)); + return getObjectProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); } @Override @@ -955,7 +953,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public CoreMessage putObjectProperty(final String key, final Object value) throws ActiveMQPropertyConversionException { messageChanged(); - putObjectProperty(new SimpleString(key), value); + putObjectProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value); return this; } @@ -968,7 +966,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public Short getShortProperty(final String key) throws ActiveMQPropertyConversionException { checkProperties(); - return properties.getShortProperty(new SimpleString(key)); + return properties.getShortProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); } @Override @@ -980,7 +978,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public Float getFloatProperty(final String key) throws ActiveMQPropertyConversionException { checkProperties(); - return properties.getFloatProperty(new SimpleString(key)); + return properties.getFloatProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); } @Override @@ -996,7 +994,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public String getStringProperty(final String key) throws ActiveMQPropertyConversionException { - return getStringProperty(new SimpleString(key)); + return getStringProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); } @Override @@ -1008,7 +1006,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public SimpleString getSimpleStringProperty(final String key) throws ActiveMQPropertyConversionException { checkProperties(); - return properties.getSimpleStringProperty(new SimpleString(key)); + return properties.getSimpleStringProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); } @Override @@ -1025,7 +1023,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { public Object removeProperty(final String key) { messageChanged(); checkProperties(); - Object oldValue = properties.removeProperty(new SimpleString(key)); + Object oldValue = properties.removeProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); if (oldValue != null) { messageChanged(); } @@ -1041,7 +1039,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public boolean containsProperty(final String key) { checkProperties(); - return properties.containsProperty(new SimpleString(key)); + return properties.containsProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); } @Override @@ -1116,6 +1114,11 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { } @Override + public CoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) { + return this; + } + + @Override public String toString() { try { checkProperties(); @@ -1135,4 +1138,12 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { return new java.util.Date(timestamp).toString(); } } + + private SimpleString.StringSimpleStringPool getPropertyKeysPool() { + return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyKeysPool(); + } + + private SimpleString.StringSimpleStringPool getPropertyValuesPool() { + return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyValuesPool(); + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java new file mode 100644 index 0000000..d4e3ed1 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.message.impl; + +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.utils.collections.TypedProperties; + +public class CoreMessageObjectPools { + + private Supplier<SimpleString.ByteBufSimpleStringPool> addressDecoderPool = Suppliers.memoize(SimpleString.ByteBufSimpleStringPool::new); + private Supplier<TypedProperties.TypedPropertiesDecoderPools> propertiesDecoderPools = Suppliers.memoize(TypedProperties.TypedPropertiesDecoderPools::new); + + private Supplier<SimpleString.StringSimpleStringPool> groupIdStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new); + private Supplier<SimpleString.StringSimpleStringPool> addressStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new); + private Supplier<TypedProperties.TypedPropertiesStringSimpleStringPools> propertiesStringSimpleStringPools = Suppliers.memoize(TypedProperties.TypedPropertiesStringSimpleStringPools::new); + + public CoreMessageObjectPools() { + } + + public SimpleString.ByteBufSimpleStringPool getAddressDecoderPool() { + return addressDecoderPool.get(); + } + + public SimpleString.StringSimpleStringPool getAddressStringSimpleStringPool() { + return addressStringSimpleStringPool.get(); + } + + public SimpleString.StringSimpleStringPool getGroupIdStringSimpleStringPool() { + return groupIdStringSimpleStringPool.get(); + } + + public TypedProperties.TypedPropertiesDecoderPools getPropertiesDecoderPools() { + return propertiesDecoderPools.get(); + } + + public TypedProperties.TypedPropertiesStringSimpleStringPools getPropertiesStringSimpleStringPools() { + return propertiesStringSimpleStringPools.get(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java index 787e499..ad8c7a9 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl; import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.PacketDecoder; @@ -32,11 +33,7 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES public class ClientPacketDecoder extends PacketDecoder { private static final long serialVersionUID = 6952614096979334582L; - public static final ClientPacketDecoder INSTANCE = new ClientPacketDecoder(); - - protected ClientPacketDecoder() { - - } + protected final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools(); @Override public Packet decode(final ActiveMQBuffer in, CoreRemotingConnection connection) { @@ -56,9 +53,9 @@ public class ClientPacketDecoder extends PacketDecoder { switch (packetType) { case SESS_RECEIVE_MSG: { if (connection.isVersionBeforeAddressChange()) { - packet = new SessionReceiveMessage_1X(new ClientMessageImpl()); + packet = new SessionReceiveMessage_1X(new ClientMessageImpl(coreMessageObjectPools)); } else { - packet = new SessionReceiveMessage(new ClientMessageImpl()); + packet = new SessionReceiveMessage(new ClientMessageImpl(coreMessageObjectPools)); } break; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java index f0005ff..c58a0bd 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java @@ -511,7 +511,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { } protected PacketDecoder createPacketDecoder() { - return ClientPacketDecoder.INSTANCE; + return new ClientPacketDecoder(); } private void forceReturnChannel1(ActiveMQException cause) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java index 2660f96..e8f5920 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java @@ -109,12 +109,18 @@ public class MessageUtil { return message.getSimpleStringProperty(REPLYTO_HEADER_NAME); } - public static void setJMSReplyTo(Message message, final SimpleString dest) { - + public static void setJMSReplyTo(Message message, final String dest) { if (dest == null) { message.removeProperty(REPLYTO_HEADER_NAME); } else { + message.putStringProperty(REPLYTO_HEADER_NAME, dest); + } + } + public static void setJMSReplyTo(Message message, final SimpleString dest) { + if (dest == null) { + message.removeProperty(REPLYTO_HEADER_NAME); + } else { message.putStringProperty(REPLYTO_HEADER_NAME, dest); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java index ec94011..310b4ed 100644 --- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java +++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java @@ -337,7 +337,7 @@ public class CoreMessageTest { public String generate(String body) throws Exception { - ClientMessageImpl message = new ClientMessageImpl(MESSAGE_TYPE, DURABLE, EXPIRATION, TIMESTAMP, PRIORITY, 10 * 1024); + ClientMessageImpl message = new ClientMessageImpl(MESSAGE_TYPE, DURABLE, EXPIRATION, TIMESTAMP, PRIORITY, 10 * 1024, null); TextMessageUtil.writeBodyText(message.getBodyBuffer(), SimpleString.toSimpleString(body)); message.setAddress(ADDRESS); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java index 626dd4d..7750564 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java @@ -99,26 +99,28 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se } } - public static String createQueueNameForSubscription(final boolean isDurable, + public static SimpleString createQueueNameForSubscription(final boolean isDurable, final String clientID, final String subscriptionName) { + final String queueName; if (clientID != null) { if (isDurable) { - return ActiveMQDestination.escape(clientID) + SEPARATOR + + queueName = ActiveMQDestination.escape(clientID) + SEPARATOR + ActiveMQDestination.escape(subscriptionName); } else { - return "nonDurable" + SEPARATOR + + queueName = "nonDurable" + SEPARATOR + ActiveMQDestination.escape(clientID) + SEPARATOR + ActiveMQDestination.escape(subscriptionName); } } else { if (isDurable) { - return ActiveMQDestination.escape(subscriptionName); + queueName = ActiveMQDestination.escape(subscriptionName); } else { - return "nonDurable" + SEPARATOR + + queueName = "nonDurable" + SEPARATOR + ActiveMQDestination.escape(subscriptionName); } } + return SimpleString.toSimpleString(queueName); } public static String createQueueNameForSharedSubscription(final boolean isDurable, @@ -192,10 +194,18 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se return new ActiveMQQueue(address); } + public static ActiveMQQueue createQueue(final SimpleString address) { + return new ActiveMQQueue(address); + } + public static ActiveMQTopic createTopic(final String address) { return new ActiveMQTopic(address); } + public static ActiveMQTopic createTopic(final SimpleString address) { + return new ActiveMQTopic(address); + } + public static ActiveMQTemporaryQueue createTemporaryQueue(final String address, final ActiveMQSession session) { return new ActiveMQTemporaryQueue(address, session); }
