Repository: activemq-artemis
Updated Branches:
  refs/heads/master 7a0a376dd -> 57fd708dc


ARTEMIS-969 Unecessary buffer expansion on message delivery


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f38d5c7d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f38d5c7d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f38d5c7d

Branch: refs/heads/master
Commit: f38d5c7dbcb66bebc5b9fba984845bd8d6aadc0c
Parents: 7a0a376
Author: Clebert Suconic <[email protected]>
Authored: Wed Feb 15 12:51:05 2017 -0500
Committer: Clebert Suconic <[email protected]>
Committed: Wed Feb 15 13:49:00 2017 -0500

----------------------------------------------------------------------
 .../impl/wireformat/SessionReceiveMessage.java  |  4 +-
 .../tests/integration/client/ConsumerTest.java  | 48 ++++++++++++++++++++
 2 files changed, 50 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f38d5c7d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
index ce76186..c21ebda 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
@@ -56,8 +56,8 @@ public class SessionReceiveMessage extends MessagePacket {
    public ActiveMQBuffer encode(final RemotingConnection connection) {
       ActiveMQBuffer buffer = message.getEncodedBuffer();
 
-      ActiveMQBuffer bufferWrite = 
connection.createTransportBuffer(buffer.writerIndex(), true);
-      bufferWrite.writeBytes(buffer, 0, bufferWrite.capacity());
+      ActiveMQBuffer bufferWrite = 
connection.createTransportBuffer(buffer.writerIndex() + DataConstants.SIZE_LONG 
+ DataConstants.SIZE_INT, true);
+      bufferWrite.writeBytes(buffer, 0, buffer.capacity());
       bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex());
 
       // Sanity check

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f38d5c7d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
index 1c1f929..8f00b2a 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
 import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
@@ -34,6 +35,7 @@ import 
org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.MessageHandler;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
 import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
@@ -41,6 +43,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.artemis.utils.ConcurrentHashSet;
 import org.junit.Assert;
 import org.junit.Before;
@@ -96,6 +99,50 @@ public class ConsumerTest extends ActiveMQTestBase {
    }
 
    @Test
+   public void testSimpleSend() throws Throwable {
+      receive(false);
+   }
+
+   @Test
+   public void testSimpleSendWithCloseConsumer() throws Throwable {
+      receive(true);
+   }
+
+   private void receive(boolean cancelOnce) throws Throwable {
+      ClientSessionFactory sf = createSessionFactory(locator);
+
+      ClientSession session = sf.createSession(false, true, true, false);
+
+      session.createQueue(QUEUE, QUEUE, null, false);
+
+      ClientConsumer consumer = session.createConsumer(QUEUE);
+
+      ClientProducer producer = session.createProducer(QUEUE);
+      ClientMessage message = session.createMessage(Message.TEXT_TYPE, true, 
0, System.currentTimeMillis(), (byte) 4);
+      message.getBodyBuffer().writeString("hi");
+      message.putStringProperty("hello", "elo");
+      producer.send(message);
+
+      session.start();
+
+      if (cancelOnce) {
+         final ClientConsumerInternal consumerInternal = 
(ClientConsumerInternal)consumer;
+         Wait.waitFor(() -> consumerInternal.getBufferSize() > 0);
+         consumer.close();
+         consumer = session.createConsumer(QUEUE);
+      }
+      ClientMessage message2 = consumer.receive(1000);
+
+      System.out.println("Id::" + message2.getMessageID());
+
+      System.out.println("Received " + message2);
+
+      session.close();
+   }
+
+
+
+   @Test
    public void testConsumerAckImmediateAutoCommitTrue() throws Exception {
       ClientSessionFactory sf = createSessionFactory(locator);
 
@@ -323,6 +370,7 @@ public class ConsumerTest extends ActiveMQTestBase {
       ClientSessionFactory sf = createSessionFactory(locator);
 
       ClientSession session = sf.createSession(false, true, true);
+      session.start();
 
       session.createQueue(QUEUE, QUEUE, null, false);
 

Reply via email to