Repository: activemq
Updated Branches:
  refs/heads/master 2e953d96a -> 6e17c2a5a


 AMQ-6577: honour usePrefetchExtension in TopicSubscription.

 AMQ-6577: move usePrefetchExtension flag to AbstractSubscription to promote 
reuse.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/687badb4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/687badb4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/687badb4

Branch: refs/heads/master
Commit: 687badb4fdd1b738103ab409f4342a82fb0e42fd
Parents: 2e953d9
Author: Vasco Veloso <[email protected]>
Authored: Wed Jan 25 10:08:37 2017 +0000
Committer: Christopher L. Shannon (cshannon) <[email protected]>
Committed: Mon Jan 30 11:13:57 2017 -0500

----------------------------------------------------------------------
 .../broker/region/AbstractSubscription.java     |   9 +
 .../broker/region/PrefetchSubscription.java     |  17 +-
 .../broker/region/TopicSubscription.java        |   7 +-
 .../broker/region/policy/PolicyEntry.java       |   1 +
 .../java/org/apache/activemq/TestSupport.java   |  13 +-
 .../activemq/usecases/ExpiredMessagesTest.java  | 235 ++++++++++++-------
 6 files changed, 183 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/687badb4/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
index 3cb2f1f..9a51a83 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
@@ -52,6 +52,7 @@ public abstract class AbstractSubscription implements 
Subscription {
     protected final CopyOnWriteArrayList<Destination> destinations = new 
CopyOnWriteArrayList<Destination>();
     protected final AtomicInteger prefetchExtension = new AtomicInteger(0);
 
+    private boolean usePrefetchExtension = true;
     private BooleanExpression selectorExpression;
     private ObjectName objectName;
     private int cursorMemoryHighWaterMark = 70;
@@ -185,6 +186,14 @@ public abstract class AbstractSubscription implements 
Subscription {
         return info.getPrefetchSize();
     }
 
+    public boolean isUsePrefetchExtension() {
+        return usePrefetchExtension;
+    }
+
+    public void setUsePrefetchExtension(boolean usePrefetchExtension) {
+        this.usePrefetchExtension = usePrefetchExtension;
+    }
+
     public void setPrefetchSize(int newSize) {
         info.setPrefetchSize(newSize);
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/687badb4/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index db133c1..314285f 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -56,7 +56,6 @@ public abstract class PrefetchSubscription extends 
AbstractSubscription {
 
     protected PendingMessageCursor pending;
     protected final List<MessageReference> dispatched = new 
ArrayList<MessageReference>();
-    protected boolean usePrefetchExtension = true;
     private int maxProducersToAudit=32;
     private int maxAuditDepth=2048;
     protected final SystemUsage usageManager;
@@ -263,7 +262,7 @@ public abstract class PrefetchSubscription extends 
AbstractSubscription {
                             registerRemoveSync(context, node);
                         }
 
-                        if (usePrefetchExtension && getPrefetchSize() != 0 && 
ack.isInTransaction()) {
+                        if (isUsePrefetchExtension() && getPrefetchSize() != 0 
&& ack.isInTransaction()) {
                             // allow transaction batch to exceed prefetch
                             while (true) {
                                 int currentExtension = prefetchExtension.get();
@@ -288,7 +287,7 @@ public abstract class PrefetchSubscription extends 
AbstractSubscription {
                     final MessageReference node = iter.next();
                     Destination nodeDest = (Destination) 
node.getRegionDestination();
                     if (ack.getLastMessageId().equals(node.getMessageId())) {
-                        if (usePrefetchExtension && getPrefetchSize() != 0) {
+                        if (isUsePrefetchExtension() && getPrefetchSize() != 
0) {
                             // allow  batch to exceed prefetch
                             while (true) {
                                 int currentExtension = prefetchExtension.get();
@@ -328,7 +327,7 @@ public abstract class PrefetchSubscription extends 
AbstractSubscription {
                         
nodeDest.getDestinationStatistics().getInflight().decrement();
 
                         if (ack.getLastMessageId().equals(messageId)) {
-                            if (usePrefetchExtension && getPrefetchSize() != 
0) {
+                            if (isUsePrefetchExtension() && getPrefetchSize() 
!= 0) {
                                 // allow  batch to exceed prefetch
                                 while (true) {
                                     int currentExtension = 
prefetchExtension.get();
@@ -444,7 +443,7 @@ public abstract class PrefetchSubscription extends 
AbstractSubscription {
 
                     @Override
                     public void beforeEnd() {
-                        if (usePrefetchExtension && getPrefetchSize() != 0) {
+                        if (isUsePrefetchExtension() && getPrefetchSize() != 
0) {
                             while (true) {
                                 int currentExtension = prefetchExtension.get();
                                 int newExtension = Math.max(0, 
currentExtension - 1);
@@ -892,14 +891,6 @@ public abstract class PrefetchSubscription extends 
AbstractSubscription {
         }
     }
 
-    public boolean isUsePrefetchExtension() {
-        return usePrefetchExtension;
-    }
-
-    public void setUsePrefetchExtension(boolean usePrefetchExtension) {
-        this.usePrefetchExtension = usePrefetchExtension;
-    }
-
     @Override
     public void setPrefetchSize(int prefetchSize) {
         this.info.setPrefetchSize(prefetchSize);

http://git-wip-us.apache.org/repos/asf/activemq/blob/687badb4/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index 095735a..65c2ba9 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -422,6 +422,9 @@ public class TopicSubscription extends AbstractSubscription 
{
     }
 
     private void incrementPrefetchExtension(int amount) {
+        if (!isUsePrefetchExtension()) {
+            return;
+        }
         while (true) {
             int currentExtension = prefetchExtension.get();
             int newExtension = Math.max(0, currentExtension + amount);
@@ -748,7 +751,8 @@ public class TopicSubscription extends AbstractSubscription 
{
     @Override
     public String toString() {
         return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", 
destinations=" + destinations.size() + ", dispatched=" + 
getDispatchedQueueSize() + ", delivered="
-                + getDequeueCounter() + ", matched=" + matched() + ", 
discarded=" + discarded() + ", prefetchExtension=" + prefetchExtension.get();
+                + getDequeueCounter() + ", matched=" + matched() + ", 
discarded=" + discarded() + ", prefetchExtension=" + prefetchExtension.get()
+                + ", usePrefetchExtension=" + isUsePrefetchExtension();
     }
 
     @Override
@@ -781,4 +785,5 @@ public class TopicSubscription extends AbstractSubscription 
{
             LOG.trace("Caught exception on dispatch after prefetch size 
change.");
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/687badb4/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
index b8401a4..5b7ff0e 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
@@ -313,6 +313,7 @@ public class PolicyEntry extends DestinationMapEntry {
 
     public void configure(Broker broker, SystemUsage memoryManager, 
TopicSubscription subscription) {
         configurePrefetch(subscription);
+        subscription.setUsePrefetchExtension(isUsePrefetchExtension());
         
subscription.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
         if (pendingMessageLimitStrategy != null) {
             int value = 
pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription);

http://git-wip-us.apache.org/repos/asf/activemq/blob/687badb4/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java 
b/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java
index 877b22a..fa07b8d 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java
@@ -18,7 +18,7 @@ package org.apache.activemq;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.ServerSocket;
+import java.util.List;
 import java.util.Map;
 
 import javax.jms.Connection;
@@ -28,13 +28,13 @@ import javax.jms.Message;
 import javax.jms.TextMessage;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
-import javax.net.ServerSocketFactory;
 
 import org.apache.activemq.broker.BrokerRegistry;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.broker.region.DestinationStatistics;
 import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -157,6 +157,15 @@ public abstract class TestSupport extends 
CombinationTestSupport {
         return result;
     }
 
+    public static List<Subscription> getDestinationConsumers(BrokerService 
broker, ActiveMQDestination destination) {
+        List<Subscription> result = null;
+        org.apache.activemq.broker.region.Destination dest = 
getDestination(broker, destination);
+        if (dest != null) {
+            result = dest.getConsumers();
+        }
+        return result;
+    }
+
     public static org.apache.activemq.broker.region.Destination 
getDestination(BrokerService target, ActiveMQDestination destination) {
         org.apache.activemq.broker.region.Destination result = null;
         for (org.apache.activemq.broker.region.Destination dest : 
getDestinationMap(target, destination).values()) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/687badb4/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
index 0187aad..9b53542 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
@@ -22,11 +22,14 @@ import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.CombinationTestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.TopicSubscription;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import 
org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
 import org.apache.activemq.util.Wait;
 import org.slf4j.Logger;
@@ -34,9 +37,11 @@ import org.slf4j.LoggerFactory;
 
 import javax.jms.*;
 
+import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.activemq.TestSupport.getDestination;
+import static org.apache.activemq.TestSupport.getDestinationConsumers;
 import static org.apache.activemq.TestSupport.getDestinationStatistics;
 
 public class ExpiredMessagesTest extends CombinationTestSupport {
@@ -48,11 +53,12 @@ public class ExpiredMessagesTest extends 
CombinationTestSupport {
     Session session;
     MessageProducer producer;
     MessageConsumer consumer;
-    public ActiveMQDestination destination = new ActiveMQQueue("test");
-    public ActiveMQDestination dlqDestination = new 
ActiveMQQueue("ActiveMQ.DLQ");
-    public boolean useTextMessage = true;
-    public boolean useVMCursor = true;
-    protected String brokerUri;
+    private ActiveMQDestination dlqDestination = new 
ActiveMQQueue("ActiveMQ.DLQ");
+    private boolean useTextMessage = true;
+    private boolean useVMCursor = true;
+    private boolean deleteAllMessages = true;
+    private boolean usePrefetchExtension = true;
+    private String brokerUri;
 
     public static Test suite() {
         return suite(ExpiredMessagesTest.class);
@@ -64,20 +70,153 @@ public class ExpiredMessagesTest extends 
CombinationTestSupport {
 
     @Override
     protected void setUp() throws Exception {
-        final boolean deleteAllMessages = true;
-        broker = createBroker(deleteAllMessages, 100);
-        brokerUri = 
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
     }
 
     @Override
     protected void tearDown() throws Exception {
+        if (null != producer) {
+            producer.close();
+        }
+        if (null != consumer) {
+            consumer.close();
+        }
+        session.close();
         connection.stop();
         broker.stop();
         broker.waitUntilStopped();
     }
 
     public void testExpiredMessages() throws Exception {
+        final ActiveMQDestination destination = new ActiveMQQueue("test");
+        final int numMessagesToSend = 10000;
+
+        buildBroker(destination);
+
+        final DestinationStatistics view = 
verifyMessageExpirationOnDestination(destination, numMessagesToSend);
+
+        verifyDestinationDlq(destination, numMessagesToSend, view);
+    }
+
+    public void testExpiredMessages_onTopic_withPrefetchExtension() throws 
Exception {
+        final ActiveMQDestination destination = new ActiveMQTopic("test");
+        final int numMessagesToSend = 10000;
+
+        usePrefetchExtension = true;
+
+        buildBroker(destination);
+
+        verifyMessageExpirationOnDestination(destination, numMessagesToSend);
+        // We don't check the DLQ because non-persistent messages on topics 
are discarded instead.
+
+        final List<Subscription> subscriptions = 
getDestinationConsumers(broker, destination);
+
+        assertTrue("prefetch extension was not incremented",
+            subscriptions.stream().
+                filter(s -> s instanceof TopicSubscription).
+                mapToInt(s -> 
((TopicSubscription)s).getPrefetchExtension().get()).
+                allMatch(e -> e > 0));
+    }
+
+    public void testExpiredMessages_onTopic_withoutPrefetchExtension() throws 
Exception {
+        final ActiveMQDestination destination = new ActiveMQTopic("test");
+        final int numMessagesToSend = 10000;
+
+        usePrefetchExtension = false;
+
+        buildBroker(destination);
 
+        verifyMessageExpirationOnDestination(destination, numMessagesToSend);
+        // We don't check the DLQ because non-persistent messages on topics 
are discarded instead.
+
+        final List<Subscription> subscriptions = 
getDestinationConsumers(broker, destination);
+
+        assertTrue("prefetch extension was incremented",
+                subscriptions.stream().
+                        filter(s -> s instanceof TopicSubscription).
+                        mapToInt(s -> 
((TopicSubscription)s).getPrefetchExtension().get()).
+                        allMatch(e -> e == 0));
+    }
+
+    private void buildBroker(ActiveMQDestination destination) throws Exception 
{
+        broker = createBroker(deleteAllMessages, usePrefetchExtension, 100, 
destination);
+        brokerUri = 
broker.getTransportConnectors().get(0).getPublishableConnectString();
+    }
+
+    public void testRecoverExpiredMessages() throws Exception {
+        final ActiveMQDestination destination = new ActiveMQQueue("test");
+
+        buildBroker(destination);
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+                "failover://"+brokerUri);
+        connection = factory.createConnection();
+        connection.start();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        producer = session.createProducer(destination);
+        producer.setTimeToLive(2000);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+        Thread producingThread = new Thread("Producing Thread") {
+            @Override
+            public void run() {
+                try {
+                    int i = 0;
+                    while (i++ < 1000) {
+                        Message message = useTextMessage ? session
+                                .createTextMessage("test") : session
+                                .createObjectMessage("test");
+                        producer.send(message);
+                    }
+                } catch (Throwable ex) {
+                    ex.printStackTrace();
+                }
+            }
+        };
+
+        producingThread.start();
+        producingThread.join();
+
+        DestinationStatistics view = getDestinationStatistics(broker, 
destination);
+        LOG.info("Stats: size: " + view.getMessages().getCount() + ", 
enqueues: "
+                + view.getEnqueues().getCount() + ", dequeues: "
+                + view.getDequeues().getCount() + ", dispatched: "
+                + view.getDispatched().getCount() + ", inflight: "
+                + view.getInflight().getCount() + ", expiries: "
+                + view.getExpired().getCount());
+
+        LOG.info("stopping broker");
+        broker.stop();
+        broker.waitUntilStopped();
+
+        Thread.sleep(5000);
+
+        LOG.info("recovering broker");
+        final boolean deleteAllMessages = false;
+        final boolean usePrefetchExtension = true;
+        broker = createBroker(deleteAllMessages, usePrefetchExtension, 5000, 
destination);
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                DestinationStatistics view = getDestinationStatistics(broker, 
destination);
+                LOG.info("Stats: size: " + view.getMessages().getCount() + ", 
enqueues: "
+                        + view.getEnqueues().getCount() + ", dequeues: "
+                        + view.getDequeues().getCount() + ", dispatched: "
+                        + view.getDispatched().getCount() + ", inflight: "
+                        + view.getInflight().getCount() + ", expiries: "
+                        + view.getExpired().getCount());
+
+                return view.getMessages().getCount() == 0;
+            }
+        });
+
+        view = getDestinationStatistics(broker, destination);
+        assertEquals("Expect empty queue, QueueSize: ", 0, 
view.getMessages().getCount());
+        assertEquals("all dequeues were expired", 
view.getDequeues().getCount(), view.getExpired().getCount());
+    }
+
+    private DestinationStatistics 
verifyMessageExpirationOnDestination(ActiveMQDestination destination, final int 
numMessagesToSend) throws Exception {
         ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(brokerUri);
         connection = factory.createConnection();
         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -100,7 +239,6 @@ public class ExpiredMessagesTest extends 
CombinationTestSupport {
                         Thread.sleep(100);
                         end = System.currentTimeMillis();
                     }
-                    consumer.close();
                 } catch (Throwable ex) {
                     ex.printStackTrace();
                 }
@@ -109,7 +247,6 @@ public class ExpiredMessagesTest extends 
CombinationTestSupport {
 
         consumerThread.start();
 
-        final int numMessagesToSend = 10000;
         Thread producingThread = new Thread("Producing Thread") {
             @Override
             public void run() {
@@ -118,7 +255,6 @@ public class ExpiredMessagesTest extends 
CombinationTestSupport {
                     while (i++ < numMessagesToSend) {
                         producer.send(session.createTextMessage("test"));
                     }
-                    producer.close();
                 } catch (Throwable ex) {
                     ex.printStackTrace();
                 }
@@ -129,7 +265,6 @@ public class ExpiredMessagesTest extends 
CombinationTestSupport {
 
         consumerThread.join();
         producingThread.join();
-        session.close();
 
         final DestinationStatistics view = getDestinationStatistics(broker, 
destination);
 
@@ -171,7 +306,10 @@ public class ExpiredMessagesTest extends 
CombinationTestSupport {
                 return view.getMessages().getCount() == 0;
             }
         }));
+        return view;
+    }
 
+    private void verifyDestinationDlq(ActiveMQDestination destination, int 
numMessagesToSend, DestinationStatistics view) throws Exception {
         final long expiredBeforeEnqueue = numMessagesToSend - 
view.getEnqueues().getCount();
         final long totalExpiredCount = view.getExpired().getCount() + 
expiredBeforeEnqueue;
 
@@ -225,77 +363,7 @@ public class ExpiredMessagesTest extends 
CombinationTestSupport {
         addCombinationValues("useVMCursor", new Object[] {Boolean.TRUE, 
Boolean.FALSE});
     }
 
-    public void testRecoverExpiredMessages() throws Exception {
-
-        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
-                "failover://"+brokerUri);
-        connection = factory.createConnection();
-        connection.start();
-        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        producer = session.createProducer(destination);
-        producer.setTimeToLive(2000);
-        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-
-        Thread producingThread = new Thread("Producing Thread") {
-            @Override
-            public void run() {
-                try {
-                    int i = 0;
-                    while (i++ < 1000) {
-                        Message message = useTextMessage ? session
-                                .createTextMessage("test") : session
-                                .createObjectMessage("test");
-                        producer.send(message);
-                    }
-                    producer.close();
-                } catch (Throwable ex) {
-                    ex.printStackTrace();
-                }
-            }
-        };
-
-        producingThread.start();
-        producingThread.join();
-
-        DestinationStatistics view = getDestinationStatistics(broker, 
destination);
-        LOG.info("Stats: size: " + view.getMessages().getCount() + ", 
enqueues: "
-                + view.getEnqueues().getCount() + ", dequeues: "
-                + view.getDequeues().getCount() + ", dispatched: "
-                + view.getDispatched().getCount() + ", inflight: "
-                + view.getInflight().getCount() + ", expiries: "
-                + view.getExpired().getCount());
-
-        LOG.info("stopping broker");
-        broker.stop();
-        broker.waitUntilStopped();
-
-        Thread.sleep(5000);
-
-        LOG.info("recovering broker");
-        final boolean deleteAllMessages = false;
-        broker = createBroker(deleteAllMessages, 5000);
-
-        Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                DestinationStatistics view = getDestinationStatistics(broker, 
destination);
-                LOG.info("Stats: size: " + view.getMessages().getCount() + ", 
enqueues: "
-                        + view.getEnqueues().getCount() + ", dequeues: "
-                        + view.getDequeues().getCount() + ", dispatched: "
-                        + view.getDispatched().getCount() + ", inflight: "
-                        + view.getInflight().getCount() + ", expiries: "
-                        + view.getExpired().getCount());
-
-                return view.getMessages().getCount() == 0;
-            }
-        });
-
-        view = getDestinationStatistics(broker, destination);
-        assertEquals("Expect empty queue, QueueSize: ", 0, 
view.getMessages().getCount());
-        assertEquals("all dequeues were expired", 
view.getDequeues().getCount(), view.getExpired().getCount());
-    }
-
-    private BrokerService createBroker(boolean deleteAllMessages, long 
expireMessagesPeriod) throws Exception {
+    private BrokerService createBroker(boolean deleteAllMessages, boolean 
usePrefetchExtension, long expireMessagesPeriod, ActiveMQDestination 
destination) throws Exception {
         BrokerService broker = new BrokerService();
         broker.setBrokerName("localhost");
         broker.setDestinations(new ActiveMQDestination[]{destination});
@@ -307,6 +375,7 @@ public class ExpiredMessagesTest extends 
CombinationTestSupport {
         }
         defaultPolicy.setExpireMessagesPeriod(expireMessagesPeriod);
         defaultPolicy.setMaxExpirePageSize(1200);
+        defaultPolicy.setUsePrefetchExtension(usePrefetchExtension);
         PolicyMap policyMap = new PolicyMap();
         policyMap.setDefaultEntry(defaultPolicy);
         broker.setDestinationPolicy(policyMap);

Reply via email to