ARTEMIS-1308: Make acknowlegde in AcitveMQMessage non blocking 

Allow commit within the acknowledge to be non blocking (batch) this toggles the 
on the existing blockonacknowlegde config.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7b40abea
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7b40abea
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7b40abea

Branch: refs/heads/master
Commit: 7b40abead95b36e5769769373d6f7bab8e34dde9
Parents: 88f78d9
Author: Michael Andre Pearce <michael.andre.pea...@me.com>
Authored: Fri Jul 28 14:27:29 2017 +0100
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Tue Aug 8 10:07:15 2017 -0400

----------------------------------------------------------------------
 .../artemis/api/core/client/ClientSession.java  | 10 ++-
 .../core/client/impl/ClientSessionImpl.java     | 10 ++-
 .../core/impl/ActiveMQSessionContext.java       |  9 +++
 .../spi/core/remoting/SessionContext.java       |  3 +
 .../artemis/jms/client/ActiveMQConnection.java  |  7 +-
 .../artemis/jms/client/ActiveMQMessage.java     | 15 ++++-
 .../jms/client/ActiveMQMessageConsumer.java     |  3 +
 .../jms/client/JMSMessageListenerWrapper.java   |  9 ++-
 .../jms/consumer/JmsConsumerTest.java           |  9 ++-
 .../artemis/jms/tests/AcknowledgementTest.java  | 69 ++++++++++++++++++++
 .../jms/tests/message/MessageHeaderTest.java    |  4 ++
 11 files changed, 137 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
index c3d6749..ab59eb6 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
@@ -894,13 +894,21 @@ public interface ClientSession extends XAResource, 
AutoCloseable {
    boolean isXA();
 
    /**
-    * Commits the current transaction.
+    * Commits the current transaction, blocking.
     *
     * @throws ActiveMQException if an exception occurs while committing the 
transaction
     */
    void commit() throws ActiveMQException;
 
    /**
+    * Commits the current transaction.
+    *
+    * @param block if the commit will be blocking or not.
+    * @throws ActiveMQException if an exception occurs while committing the 
transaction
+    */
+   void commit(boolean block) throws ActiveMQException;
+
+   /**
     * Rolls back the current transaction.
     *
     * @throws ActiveMQException if an exception occurs while rolling back the 
transaction

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index 5f6b40b..ef4e87c 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -761,6 +761,11 @@ public final class ClientSessionImpl implements 
ClientSessionInternal, FailureLi
 
    @Override
    public void commit() throws ActiveMQException {
+      commit(true);
+   }
+
+   @Override
+   public void commit(boolean block) throws ActiveMQException {
       checkClosed();
 
       if (logger.isTraceEnabled()) {
@@ -782,8 +787,9 @@ public final class ClientSessionImpl implements 
ClientSessionInternal, FailureLi
       if (rollbackOnly) {
          rollbackOnFailover(true);
       }
+      startCall();
       try {
-         sessionContext.simpleCommit();
+         sessionContext.simpleCommit(block);
       } catch (ActiveMQException e) {
          if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == 
ActiveMQExceptionType.CONNECTION_TIMEDOUT || rollbackOnly) {
             // The call to commit was unlocked on failover, we therefore 
rollback the tx,
@@ -794,6 +800,8 @@ public final class ClientSessionImpl implements 
ClientSessionInternal, FailureLi
          } else {
             throw e;
          }
+      } finally {
+         endCall();
       }
 
       //oops, we have failed over during the commit and don't know what 
happened

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index fc43672..d0d75ac 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -349,6 +349,15 @@ public class ActiveMQSessionContext extends SessionContext 
{
    }
 
    @Override
+   public void simpleCommit(boolean block) throws ActiveMQException {
+      if (block) {
+         sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_COMMIT), 
PacketImpl.NULL_RESPONSE);
+      } else {
+         sessionChannel.sendBatched(new PacketImpl(PacketImpl.SESS_COMMIT));
+      }
+   }
+
+   @Override
    public void simpleRollback(boolean lastMessageAsDelivered) throws 
ActiveMQException {
       sessionChannel.sendBlocking(new RollbackMessage(lastMessageAsDelivered), 
PacketImpl.NULL_RESPONSE);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
index b123960..78135a8 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
@@ -214,6 +214,9 @@ public abstract class SessionContext {
 
    public abstract void simpleCommit() throws ActiveMQException;
 
+   public abstract void simpleCommit(boolean block) throws ActiveMQException;
+
+
    /**
     * If we are doing a simple rollback on the RA, we need to ack the last 
message sent to the consumer,
     * otherwise DLQ won't work.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java
----------------------------------------------------------------------
diff --git 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java
 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java
index 6432af2..bf0d236 100644
--- 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java
+++ 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java
@@ -597,7 +597,8 @@ public class ActiveMQConnection extends 
ActiveMQConnectionForContextImpl impleme
 
       try {
          ClientSession session;
-
+         boolean isBlockOnAcknowledge = 
sessionFactory.getServerLocator().isBlockOnAcknowledge();
+         int ackBatchSize = 
sessionFactory.getServerLocator().getAckBatchSize();
          if (acknowledgeMode == Session.SESSION_TRANSACTED) {
             session = sessionFactory.createSession(username, password, isXA, 
false, false, sessionFactory.getServerLocator().isPreAcknowledge(), 
transactionBatchSize);
          } else if (acknowledgeMode == Session.AUTO_ACKNOWLEDGE) {
@@ -605,9 +606,9 @@ public class ActiveMQConnection extends 
ActiveMQConnectionForContextImpl impleme
          } else if (acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE) {
             session = sessionFactory.createSession(username, password, isXA, 
true, true, sessionFactory.getServerLocator().isPreAcknowledge(), 
dupsOKBatchSize);
          } else if (acknowledgeMode == Session.CLIENT_ACKNOWLEDGE) {
-            session = sessionFactory.createSession(username, password, isXA, 
true, false, sessionFactory.getServerLocator().isPreAcknowledge(), 
transactionBatchSize);
+            session = sessionFactory.createSession(username, password, isXA, 
true, false, sessionFactory.getServerLocator().isPreAcknowledge(), 
isBlockOnAcknowledge ? transactionBatchSize : ackBatchSize);
          } else if (acknowledgeMode == 
ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE) {
-            session = sessionFactory.createSession(username, password, isXA, 
true, false, false, transactionBatchSize);
+            session = sessionFactory.createSession(username, password, isXA, 
true, false, false, isBlockOnAcknowledge ? transactionBatchSize : ackBatchSize);
          } else if (acknowledgeMode == ActiveMQJMSConstants.PRE_ACKNOWLEDGE) {
             session = sessionFactory.createSession(username, password, isXA, 
true, false, true, transactionBatchSize);
          } else {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
----------------------------------------------------------------------
diff --git 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
index f13f602..928d375 100644
--- 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
+++ 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
@@ -43,6 +43,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
+import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
 import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.reader.MessageUtil;
@@ -200,6 +201,8 @@ public class ActiveMQMessage implements javax.jms.Message {
 
    private boolean individualAck;
 
+   private boolean clientAck;
+
    private long jmsDeliveryTime;
 
    // Constructors --------------------------------------------------
@@ -710,11 +713,15 @@ public class ActiveMQMessage implements javax.jms.Message 
{
    public void acknowledge() throws JMSException {
       if (session != null) {
          try {
+            if (session.isClosed()) {
+               throw ActiveMQClientMessageBundle.BUNDLE.sessionClosed();
+            }
             if (individualAck) {
                message.individualAcknowledge();
             }
-
-            session.commit();
+            if (clientAck || individualAck) {
+               session.commit(session.isBlockOnAcknowledge());
+            }
          } catch (ActiveMQException e) {
             throw JMSExceptionHelper.convertFromActiveMQException(e);
          }
@@ -777,6 +784,10 @@ public class ActiveMQMessage implements javax.jms.Message {
       this.individualAck = true;
    }
 
+   public void setClientAcknowledge() {
+      this.clientAck = true;
+   }
+
    public void resetMessageID(final String newMsgID) {
       this.msgID = newMsgID;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
index 3d7fa56..4664bb9 100644
--- 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
+++ 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
@@ -237,6 +237,9 @@ public final class ActiveMQMessageConsumer implements 
QueueReceiver, TopicSubscr
             // https://issues.jboss.org/browse/JBPAPP-6110
             if (session.getAcknowledgeMode() == 
ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE) {
                jmsMsg.setIndividualAcknowledge();
+            } else if (session.getAcknowledgeMode() == 
Session.CLIENT_ACKNOWLEDGE) {
+               jmsMsg.setClientAcknowledge();
+               coreMessage.acknowledge();
             } else {
                coreMessage.acknowledge();
             }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
----------------------------------------------------------------------
diff --git 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
index 92ae226..5d9f6ed 100644
--- 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
+++ 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
@@ -41,6 +41,8 @@ public class JMSMessageListenerWrapper implements 
MessageHandler {
 
    private final boolean individualACK;
 
+   private final boolean clientACK;
+
    protected JMSMessageListenerWrapper(final ConnectionFactoryOptions options,
                                        final ActiveMQConnection connection,
                                        final ActiveMQSession session,
@@ -60,6 +62,8 @@ public class JMSMessageListenerWrapper implements 
MessageHandler {
       transactedOrClientAck = (ackMode == Session.SESSION_TRANSACTED || 
ackMode == Session.CLIENT_ACKNOWLEDGE) || session.isXA();
 
       individualACK = (ackMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
+
+      clientACK = (ackMode == Session.CLIENT_ACKNOWLEDGE);
    }
 
    /**
@@ -74,11 +78,14 @@ public class JMSMessageListenerWrapper implements 
MessageHandler {
          msg.setIndividualAcknowledge();
       }
 
+      if (clientACK) {
+         msg.setClientAcknowledge();
+      }
+
       try {
          msg.doBeforeReceive();
       } catch (Exception e) {
          
ActiveMQJMSClientLogger.LOGGER.errorPreparingMessageForReceipt(msg.getCoreMessage().toString(),
 e);
-
          return;
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java
index d242da8..5cefbd0 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java
@@ -179,9 +179,10 @@ public class JmsConsumerTest extends JMSTestBase {
       }
 
       SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME);
+      conn.close();
+
       Assert.assertEquals(0, ((Queue) 
server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
       Assert.assertEquals(0, getMessageCount((Queue) 
server.getPostOffice().getBinding(queueName).getBindable()));
-      conn.close();
    }
 
    @Test
@@ -225,9 +226,10 @@ public class JmsConsumerTest extends JMSTestBase {
       }
 
       SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME);
+      context.close();
+
       Assert.assertEquals(0, ((Queue) 
server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
       Assert.assertEquals(0, getMessageCount((Queue) 
server.getPostOffice().getBinding(queueName).getBindable()));
-      context.close();
    }
 
    @Test
@@ -299,9 +301,10 @@ public class JmsConsumerTest extends JMSTestBase {
       }
 
       SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME);
+      conn.close();
+
       Assert.assertEquals(0, ((Queue) 
server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
       Assert.assertEquals(0, getMessageCount((Queue) 
server.getPostOffice().getBinding(queueName).getBindable()));
-      conn.close();
    }
 
    @Test

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java
----------------------------------------------------------------------
diff --git 
a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java
 
b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java
index 17927b1..a9ede4b 100644
--- 
a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java
+++ 
b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java
@@ -19,6 +19,8 @@ package org.apache.activemq.artemis.jms.tests;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
@@ -31,6 +33,9 @@ import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
 import java.util.concurrent.CountDownLatch;
 
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.jms.JMSFactoryType;
+import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
 import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport;
 import org.junit.Assert;
 import org.junit.Test;
@@ -1297,4 +1302,68 @@ public class AcknowledgementTest extends JMSTestCase {
 
       checkEmpty(queue1);
    }
+
+   /**
+    * Ensure no blocking calls in acknowledge flow when block on acknowledge = 
false.
+    * This is done by checking the performance compared to blocking is much 
improved.
+    */
+   @Test
+   public void testNonBlockingAckPerf() throws Exception {
+      getJmsServerManager().createConnectionFactory("testsuitecf1", false, 
JMSFactoryType.CF, NETTY_CONNECTOR, null, 
ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, 
ActiveMQClient.DEFAULT_CONNECTION_TTL, ActiveMQClient.DEFAULT_CALL_TIMEOUT, 
ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, 
ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT, 
ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, 
ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, 
ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, 
ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, 
ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, 
ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, 
ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, true, true, true, 
ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, 
ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, 
ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, 
ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, 
ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, Ac
 tiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, 
ActiveMQClient.DEFAULT_RETRY_INTERVAL, 
ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER, 
ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, 
ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS, 
ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, "/testsuitecf1");
+      getJmsServerManager().createConnectionFactory("testsuitecf2", false, 
JMSFactoryType.CF, NETTY_CONNECTOR, null, 
ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, 
ActiveMQClient.DEFAULT_CONNECTION_TTL, ActiveMQClient.DEFAULT_CALL_TIMEOUT, 
ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, 
ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT, 
ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, 
ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, 
ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, 
ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, 
ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, 
ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, 
ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, true, true, true, 
ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, 
ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, 
ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, 
ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, 
ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, Ac
 tiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, 
ActiveMQClient.DEFAULT_RETRY_INTERVAL, 
ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER, 
ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, 
ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS, 
ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, "/testsuitecf2");
+
+      ActiveMQJMSConnectionFactory cf1 = (ActiveMQJMSConnectionFactory) 
getInitialContext().lookup("/testsuitecf1");
+      cf1.setBlockOnAcknowledge(false);
+      ActiveMQJMSConnectionFactory cf2 = (ActiveMQJMSConnectionFactory) 
getInitialContext().lookup("/testsuitecf2");
+      cf2.setBlockOnAcknowledge(true);
+
+      int messageCount = 10000;
+
+      long sendT1 = send(cf1, queue1, messageCount);
+      long sendT2 = send(cf2, queue2, messageCount);
+
+      long time1 = consume(cf1, queue1, messageCount);
+      long time2 = consume(cf2, queue2, messageCount);
+
+      log.info("BlockOnAcknowledge=false MessageCount=" + messageCount + " 
TimeToConsume=" + time1);
+      log.info("BlockOnAcknowledge=true MessageCount=" + messageCount + " 
TimeToConsume=" + time2);
+
+      Assert.assertTrue(time1 < (time2 / 2));
+
+   }
+
+   private long send(ConnectionFactory connectionFactory, Destination 
destination, int messageCount) throws JMSException {
+      try (Connection connection = connectionFactory.createConnection()) {
+         connection.start();
+         try (Session session = connection.createSession(true, 
Session.CLIENT_ACKNOWLEDGE)) {
+            MessageProducer producer = session.createProducer(destination);
+            Message m = session.createTextMessage("testing123");
+            long start = System.nanoTime();
+            for (int i = 0; i < messageCount; i++) {
+               producer.send(m);
+            }
+            session.commit();
+            long end = System.nanoTime();
+            return end - start;
+         }
+      }
+   }
+
+   private long consume(ConnectionFactory connectionFactory, Destination 
destination, int messageCount) throws JMSException {
+      try (Connection connection = connectionFactory.createConnection()) {
+         connection.start();
+         try (Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE)) {
+            MessageConsumer consumer = session.createConsumer(destination);
+            long start = System.nanoTime();
+            for (int i = 0; i < messageCount; i++) {
+               Message message = consumer.receive(100);
+               if (message != null) {
+                  message.acknowledge();
+               }
+            }
+            long end = System.nanoTime();
+            return end - start;
+         }
+      }
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java
----------------------------------------------------------------------
diff --git 
a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java
 
b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java
index efea045..39ea0e3 100644
--- 
a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java
+++ 
b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java
@@ -1276,6 +1276,10 @@ public class MessageHeaderTest extends 
MessageHeaderTestBase {
       }
 
       @Override
+      public void commit(boolean block) throws ActiveMQException {
+      }
+
+      @Override
       public boolean isRollbackOnly() {
 
          return false;

Reply via email to