Repository: activemq-artemis Updated Branches: refs/heads/master 7a0a376dd -> 57fd708dc
ARTEMIS-969 Unecessary buffer expansion on message delivery Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f38d5c7d Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f38d5c7d Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f38d5c7d Branch: refs/heads/master Commit: f38d5c7dbcb66bebc5b9fba984845bd8d6aadc0c Parents: 7a0a376 Author: Clebert Suconic <[email protected]> Authored: Wed Feb 15 12:51:05 2017 -0500 Committer: Clebert Suconic <[email protected]> Committed: Wed Feb 15 13:49:00 2017 -0500 ---------------------------------------------------------------------- .../impl/wireformat/SessionReceiveMessage.java | 4 +- .../tests/integration/client/ConsumerTest.java | 48 ++++++++++++++++++++ 2 files changed, 50 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f38d5c7d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java index ce76186..c21ebda 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java @@ -56,8 +56,8 @@ public class SessionReceiveMessage extends MessagePacket { public ActiveMQBuffer encode(final RemotingConnection connection) { ActiveMQBuffer buffer = message.getEncodedBuffer(); - ActiveMQBuffer bufferWrite = connection.createTransportBuffer(buffer.writerIndex(), true); - bufferWrite.writeBytes(buffer, 0, bufferWrite.capacity()); + ActiveMQBuffer bufferWrite = connection.createTransportBuffer(buffer.writerIndex() + DataConstants.SIZE_LONG + DataConstants.SIZE_INT, true); + bufferWrite.writeBytes(buffer, 0, buffer.capacity()); bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex()); // Sanity check http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f38d5c7d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java index 1c1f929..8f00b2a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; @@ -34,6 +35,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.MessageHandler; import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal; import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; @@ -41,6 +43,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.junit.Assert; import org.junit.Before; @@ -96,6 +99,50 @@ public class ConsumerTest extends ActiveMQTestBase { } @Test + public void testSimpleSend() throws Throwable { + receive(false); + } + + @Test + public void testSimpleSendWithCloseConsumer() throws Throwable { + receive(true); + } + + private void receive(boolean cancelOnce) throws Throwable { + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = sf.createSession(false, true, true, false); + + session.createQueue(QUEUE, QUEUE, null, false); + + ClientConsumer consumer = session.createConsumer(QUEUE); + + ClientProducer producer = session.createProducer(QUEUE); + ClientMessage message = session.createMessage(Message.TEXT_TYPE, true, 0, System.currentTimeMillis(), (byte) 4); + message.getBodyBuffer().writeString("hi"); + message.putStringProperty("hello", "elo"); + producer.send(message); + + session.start(); + + if (cancelOnce) { + final ClientConsumerInternal consumerInternal = (ClientConsumerInternal)consumer; + Wait.waitFor(() -> consumerInternal.getBufferSize() > 0); + consumer.close(); + consumer = session.createConsumer(QUEUE); + } + ClientMessage message2 = consumer.receive(1000); + + System.out.println("Id::" + message2.getMessageID()); + + System.out.println("Received " + message2); + + session.close(); + } + + + + @Test public void testConsumerAckImmediateAutoCommitTrue() throws Exception { ClientSessionFactory sf = createSessionFactory(locator); @@ -323,6 +370,7 @@ public class ConsumerTest extends ActiveMQTestBase { ClientSessionFactory sf = createSessionFactory(locator); ClientSession session = sf.createSession(false, true, true); + session.start(); session.createQueue(QUEUE, QUEUE, null, false);
