Repository: activemq Updated Branches: refs/heads/master 5d9f1cd3d -> ffee8b442
https://issues.apache.org/jira/browse/AMQ-6422 - match proton sender view credit to prefetchExtension - tracking credit to dispatch delta to track additional flow requests. Proton sender layer is distinct from the transport layer - they mirror each other Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ffee8b44 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ffee8b44 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ffee8b44 Branch: refs/heads/master Commit: ffee8b442f57b38d57a59745b9062e8d963c65ba Parents: 5d9f1cd Author: gtully <gary.tu...@gmail.com> Authored: Wed Sep 21 10:33:20 2016 +0100 Committer: gtully <gary.tu...@gmail.com> Committed: Wed Sep 21 10:33:20 2016 +0100 ---------------------------------------------------------------------- .../transport/amqp/protocol/AmqpSender.java | 91 ++++++++++---------- .../amqp/JMSClientTransactionTest.java | 6 -- .../amqp/interop/AmqpSendReceiveTest.java | 8 +- .../activemq/broker/region/AbstractRegion.java | 2 +- .../broker/region/AbstractSubscription.java | 12 +++ .../broker/region/PrefetchSubscription.java | 10 +-- .../broker/region/QueueSubscription.java | 2 +- .../broker/region/TopicSubscription.java | 21 ++++- .../TopicSubscriptionZeroPrefetchTest.java | 19 ++++ 9 files changed, 103 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/ffee8b44/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java index 0b85858..75f2371 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java @@ -20,8 +20,9 @@ import static org.apache.activemq.transport.amqp.AmqpSupport.toLong; import java.io.IOException; import java.util.LinkedList; +import java.util.concurrent.atomic.AtomicInteger; -import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.AbstractSubscription; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ConsumerControl; @@ -81,7 +82,10 @@ public class AmqpSender extends AmqpAbstractLink<Sender> { private final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT"; private final ConsumerInfo consumerInfo; - private Subscription subscription; + private AbstractSubscription subscription; + private AtomicInteger prefetchExtension; + private int currentCreditRequest; + private int logicalDeliveryCount; // echoes prefetch extension but from protons perspective private final boolean presettle; private boolean draining; @@ -111,7 +115,8 @@ public class AmqpSender extends AmqpAbstractLink<Sender> { public void open() { if (!isClosed()) { session.registerSender(getConsumerId(), this); - subscription = session.getConnection().lookupPrefetchSubscription(consumerInfo); + subscription = (AbstractSubscription)session.getConnection().lookupPrefetchSubscription(consumerInfo); + prefetchExtension = subscription.getPrefetchExtension(); } super.open(); @@ -168,24 +173,15 @@ public class AmqpSender extends AmqpAbstractLink<Sender> { public void flow() throws Exception { Link endpoint = getEndpoint(); if (LOG.isTraceEnabled()) { - LOG.trace("Flow: draining={}, drain={} credit={}, remoteCredit={}, queued={}, unsettled={}", + LOG.trace("Flow: draining={}, drain={} credit={}, currentCredit={}, senderDeliveryCount={} - Sub={}", draining, endpoint.getDrain(), - endpoint.getCredit(), endpoint.getRemoteCredit(), endpoint.getQueued(), endpoint.getUnsettled()); + endpoint.getCredit(), currentCreditRequest, logicalDeliveryCount, subscription); } + final int endpointCredit = endpoint.getCredit(); if (endpoint.getDrain() && !draining) { - // Revert to a pull consumer. - ConsumerControl control = new ConsumerControl(); - control.setConsumerId(getConsumerId()); - control.setDestination(getDestination()); - control.setPrefetch(0); - - LOG.trace("Flow: Pull case -> consumer control with prefetch (0) to control output"); - - sendToActiveMQ(control); - - if (endpoint.getCredit() > 0) { + if (endpointCredit > 0) { draining = true; // Now request dispatch of the drain amount, we request immediate @@ -196,9 +192,9 @@ public class AmqpSender extends AmqpAbstractLink<Sender> { pullRequest.setDestination(getDestination()); pullRequest.setTimeout(-1); pullRequest.setAlwaysSignalDone(true); - pullRequest.setQuantity(endpoint.getCredit()); + pullRequest.setQuantity(endpointCredit); - LOG.trace("Pull case -> consumer pull request quantity = {}", endpoint.getCredit()); + LOG.trace("Pull case -> consumer pull request quantity = {}", endpointCredit); sendToActiveMQ(pullRequest); } else { @@ -207,25 +203,36 @@ public class AmqpSender extends AmqpAbstractLink<Sender> { pumpOutbound(); getEndpoint().drained(); session.pumpProtonToSocket(); + currentCreditRequest = 0; + logicalDeliveryCount = 0; } - } else { - ConsumerControl control = new ConsumerControl(); - control.setConsumerId(getConsumerId()); - control.setDestination(getDestination()); - - int remoteCredit = endpoint.getRemoteCredit(); - if (remoteCredit > 0 && subscription != null) { - // ensure prefetch exceeds credit + inflight - if (remoteCredit + endpoint.getUnsettled() + endpoint.getQueued() > subscription.getPrefetchSize()) { - LOG.trace("Adding dispatched size to credit for sub: " + subscription); - remoteCredit += subscription.getDispatchedQueueSize(); - } - } - control.setPrefetch(remoteCredit); + } else if (endpointCredit >= 0) { + + if (endpointCredit == 0 && currentCreditRequest != 0) { + + prefetchExtension.set(0); + currentCreditRequest = 0; + logicalDeliveryCount = 0; + LOG.trace("Flow: credit 0 for sub:" + subscription); - LOG.trace("Flow: update -> consumer control with prefetch {}", control.getPrefetch()); + } else { - sendToActiveMQ(control); + int deltaToAdd = endpointCredit; + int logicalCredit = currentCreditRequest - logicalDeliveryCount; + if (logicalCredit > 0) { + deltaToAdd -= logicalCredit; + } else { + // reset delivery counter - dispatch from broker concurrent with credit=0 flow can go negative + logicalDeliveryCount = 0; + } + if (deltaToAdd > 0) { + currentCreditRequest = prefetchExtension.addAndGet(deltaToAdd); + subscription.wakeupDestinationsForDispatch(); + // force dispatch of matched/pending for topics (pending messages accumulate in the sub and are dispatched on update of prefetch) + subscription.setPrefetchSize(0); + LOG.trace("Flow: credit addition of {} for sub {}", deltaToAdd, subscription); + } + } } } @@ -285,6 +292,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> { } pumpOutbound(); + logicalDeliveryCount++; } @Override @@ -440,6 +448,8 @@ public class AmqpSender extends AmqpAbstractLink<Sender> { // It's the end of browse signal in response to a MessagePull getEndpoint().drained(); draining = false; + currentCreditRequest = 0; + logicalDeliveryCount = 0; } else { if (LOG.isTraceEnabled()) { LOG.trace("Sender:[{}] msgId={} draining={}, drain={}, credit={}, remoteCredit={}, queued={}", @@ -451,6 +461,8 @@ public class AmqpSender extends AmqpAbstractLink<Sender> { LOG.trace("Sender:[{}] browse complete.", getEndpoint().getName()); getEndpoint().drained(); draining = false; + currentCreditRequest = 0; + logicalDeliveryCount = 0; } jms.setRedeliveryCounter(md.getRedeliveryCounter()); @@ -481,17 +493,6 @@ public class AmqpSender extends AmqpAbstractLink<Sender> { tagCache.returnTag(tag); } - int newCredit = Math.max(0, getEndpoint().getCredit() - 1); - LOG.trace("Sender:[{}] updating conumser prefetch:{} after delivery settled.", - getEndpoint().getName(), newCredit); - - ConsumerControl control = new ConsumerControl(); - control.setConsumerId(getConsumerId()); - control.setDestination(getDestination()); - control.setPrefetch(newCredit); - - sendToActiveMQ(control); - if (ackType == -1) { // we are going to settle, but redeliver.. we we won't yet ack to ActiveMQ delivery.settle(); http://git-wip-us.apache.org/repos/asf/activemq/blob/ffee8b44/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java index 1251410..f481ba9 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java @@ -193,8 +193,6 @@ public class JMSClientTransactionTest extends JMSClientTestSupport { assertEquals(MSG_COUNT, getProxyToQueue(getDestinationName()).getQueueSize()); SubscriptionViewMBean subscription = getProxyToQueueSubscriber(getDestinationName()); assertNotNull(subscription); - LOG.info("Subscription[{}]: prefetch size after rollback = {}", subscription.getSubscriptionId(), subscription.getPrefetchSize()); - assertTrue(subscription.getPrefetchSize() > 0); for (int i = 1; i <= MSG_COUNT; i++) { LOG.info("Trying to receive message: {}", i); @@ -259,8 +257,6 @@ public class JMSClientTransactionTest extends JMSClientTestSupport { assertEquals(MSG_COUNT, getProxyToQueue(getDestinationName()).getQueueSize()); SubscriptionViewMBean subscription = getProxyToQueueSubscriber(getDestinationName()); assertNotNull(subscription); - LOG.info("Subscription[{}]: prefetch size after rollback = {}", subscription.getSubscriptionId(), subscription.getPrefetchSize()); - assertTrue(subscription.getPrefetchSize() > 0); assertTrue("Should read all " + MSG_COUNT + " messages.", Wait.waitFor(new Wait.Condition() { @@ -273,7 +269,6 @@ public class JMSClientTransactionTest extends JMSClientTestSupport { LOG.info("COMMIT of first received batch here:"); session.commit(); - assertTrue(subscription.getPrefetchSize() > 0); for (int i = 1; i <= MSG_COUNT; i++) { LOG.info("Sending message: {} to commit", msgIndex++); TextMessage message = session.createTextMessage("Commit Message: " + msgIndex); @@ -286,7 +281,6 @@ public class JMSClientTransactionTest extends JMSClientTestSupport { LOG.info("WAITING -> for next three messages to arrive:"); - assertTrue(subscription.getPrefetchSize() > 0); assertTrue("Should read all " + MSG_COUNT + " messages.", Wait.waitFor(new Wait.Condition() { @Override http://git-wip-us.apache.org/repos/asf/activemq/blob/ffee8b44/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java index 3132e6e..34436f2 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java @@ -294,7 +294,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { receiver2.flow(splitCredit); for (int i = 0; i < splitCredit; i++) { AmqpMessage message = receiver2.receive(10, TimeUnit.SECONDS); - assertNotNull("Receiver #2 should have read a message", message); + assertNotNull("Receiver #2 should have read message[" + i + "]", message); LOG.info("Receiver #2 read message: {}", message.getMessageId()); message.accept(); } @@ -671,7 +671,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { LOG.info("*** Attempting to read remaining messages with both receivers"); int splitCredit = (MSG_COUNT - 4) / 2; - LOG.info("**** Receiver #1 granting creadit[{}] for its block of messages", splitCredit); + LOG.info("**** Receiver #1 granting credit[{}] for its block of messages", splitCredit); receiver1.flow(splitCredit); for (int i = 0; i < splitCredit; i++) { AmqpMessage message = receiver1.receive(10, TimeUnit.SECONDS); @@ -680,11 +680,11 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { message.accept(); } - LOG.info("**** Receiver #2 granting creadit[{}] for its block of messages", splitCredit); + LOG.info("**** Receiver #2 granting credit[{}] for its block of messages", splitCredit); receiver2.flow(splitCredit); for (int i = 0; i < splitCredit; i++) { AmqpMessage message = receiver2.receive(10, TimeUnit.SECONDS); - assertNotNull("Receiver #2 should have read a message", message); + assertNotNull("Receiver #2 should have read a message[" + i + "]", message); LOG.info("Receiver #2 read message: {}", message.getMessageId()); message.accept(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/ffee8b44/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index be77b6e..13251c8 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -688,7 +688,7 @@ public abstract class AbstractRegion implements Region { entry.configurePrefetch(sub); } } - LOG.debug("setting prefetch: {}, on subscription: {}; resulting value: {}", new Object[]{ control.getPrefetch(), control.getConsumerId(), sub.getConsumerInfo().getCurrentPrefetchSize()}); + LOG.debug("setting prefetch: {}, on subscription: {}; resulting value: {}", new Object[]{ control.getPrefetch(), control.getConsumerId(), sub.getConsumerInfo().getPrefetchSize()}); try { lookup(consumerExchange.getConnectionContext(), control.getDestination(),false).wakeup(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/activemq/blob/ffee8b44/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java index 1d84269..3cb2f1f 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; @@ -49,6 +50,7 @@ public abstract class AbstractSubscription implements Subscription { protected ConsumerInfo info; protected final DestinationFilter destinationFilter; protected final CopyOnWriteArrayList<Destination> destinations = new CopyOnWriteArrayList<Destination>(); + protected final AtomicInteger prefetchExtension = new AtomicInteger(0); private BooleanExpression selectorExpression; private ObjectName objectName; @@ -309,4 +311,14 @@ public abstract class AbstractSubscription implements Subscription { public SubscriptionStatistics getSubscriptionStatistics() { return subscriptionStatistics; } + + public void wakeupDestinationsForDispatch() { + for (Destination dest : destinations) { + dest.wakeup(); + } + } + + public AtomicInteger getPrefetchExtension() { + return this.prefetchExtension; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/ffee8b44/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 5254440..0a277fb 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -23,7 +23,6 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import javax.jms.JMSException; @@ -57,7 +56,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription { protected PendingMessageCursor pending; protected final List<MessageReference> dispatched = new ArrayList<MessageReference>(); - protected final AtomicInteger prefetchExtension = new AtomicInteger(); protected boolean usePrefetchExtension = true; private int maxProducersToAudit=32; private int maxAuditDepth=2048; @@ -431,9 +429,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { dispatchPending(); if (pending.isEmpty()) { - for (Destination dest : destinations) { - dest.wakeup(); - } + wakeupDestinationsForDispatch(); } } else { LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): {}", ack); @@ -904,10 +900,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription { this.usePrefetchExtension = usePrefetchExtension; } - protected int getPrefetchExtension() { - return this.prefetchExtension.get(); - } - @Override public void setPrefetchSize(int prefetchSize) { this.info.setPrefetchSize(prefetchSize); http://git-wip-us.apache.org/repos/asf/activemq/blob/ffee8b44/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java index 358f946..6e865ec 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java @@ -69,7 +69,7 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner @Override public synchronized String toString() { return "QueueSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + dispatched.size() + ", delivered=" - + this.prefetchExtension + ", pending=" + getPendingQueueSize(); + + this.prefetchExtension + ", pending=" + getPendingQueueSize() + ", prefetch=" + getPrefetchSize() + ", prefetchExtension=" + prefetchExtension.get(); } @Override http://git-wip-us.apache.org/repos/asf/activemq/blob/ffee8b44/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index eff2393..6ab264d 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -64,7 +64,6 @@ public class TopicSubscription extends AbstractSubscription { private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy(); private int discarded; private final Object matchedListMutex = new Object(); - private final AtomicInteger prefetchExtension = new AtomicInteger(0); private int memoryUsageHighWaterMark = 95; // allow duplicate suppression in a ring network of brokers protected int maxProducersToAudit = 1024; @@ -410,6 +409,16 @@ public class TopicSubscription extends AbstractSubscription { } } + private void decrementPrefetchExtension() { + while (true) { + int currentExtension = prefetchExtension.get(); + int newExtension = Math.max(0, currentExtension - 1); + if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { + break; + } + } + } + @Override public int countBeforeFull() { return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize() + prefetchExtension.get() - getDispatchedQueueSize(); @@ -529,6 +538,9 @@ public class TopicSubscription extends AbstractSubscription { // ------------------------------------------------------------------------- @Override public boolean isFull() { + if (info.getPrefetchSize() == 0) { + return prefetchExtension.get() == 0; + } return getDispatchedQueueSize() >= info.getPrefetchSize(); } @@ -655,6 +667,11 @@ public class TopicSubscription extends AbstractSubscription { } } } + + if (getPrefetchSize() == 0) { + decrementPrefetchExtension(); + } + } if (info.isDispatchAsync()) { if (node != null) { @@ -712,7 +729,7 @@ public class TopicSubscription extends AbstractSubscription { @Override public String toString() { return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered=" - + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded(); + + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded() + ", prefetchExtension=" + prefetchExtension.get(); } @Override http://git-wip-us.apache.org/repos/asf/activemq/blob/ffee8b44/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java index b9f0d50..38fa921 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java @@ -22,6 +22,7 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; +import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQTopic; @@ -75,6 +76,24 @@ public class TopicSubscriptionZeroPrefetchTest { Assert.assertNotNull("should have received a message the published message", consumedMessage); } + @Test(timeout=60000) + public void testTopicConsumerPrefetchZeroClientAckLoop() throws Exception { + ActiveMQTopic consumerDestination = new ActiveMQTopic(TOPIC_NAME + "?consumer.retroactive=true&consumer.prefetchSize=0"); + Session consumerClientAckSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + consumer = consumerClientAckSession.createConsumer(consumerDestination); + + final int count = 10; + for (int i=0;i<count;i++) { + Message txtMessage = session.createTextMessage("M:"+ i); + producer.send(txtMessage); + } + + for (int i=0;i<count;i++) { + Message consumedMessage = consumer.receive(2000); + Assert.assertNotNull("should have received message[" + i +"]", consumedMessage); + } + } + /* * test durable topic subscription with prefetch zero */