Repository: qpid-jms
Updated Branches:
  refs/heads/master ff7f49fe6 -> b5f00d23a


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
index b041f51..f0726f9 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
@@ -33,8 +33,7 @@ import javax.jms.Queue;
 import javax.jms.QueueBrowser;
 import javax.jms.Session;
 
-import org.apache.qpid.jms.JmsConnection;
-import org.apache.qpid.jms.JmsPrefetchPolicy;
+import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Declare;
@@ -89,7 +88,7 @@ public class QueueBrowserIntegrationTest extends 
QpidJmsTestCase {
 
             // Expected the browser to create a consumer and send credit.
             testPeer.expectQueueBrowserAttach();
-            testPeer.expectLinkFlow(false, 
equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
+            testPeer.expectLinkFlow(false, 
equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
             testPeer.expectDetach(true, true, true);
 
             QueueBrowser browser = session.createBrowser(queue);
@@ -119,9 +118,9 @@ public class QueueBrowserIntegrationTest extends 
QpidJmsTestCase {
             // no message is there to satisfy an internal hasMoreElements 
check, then send more
             // credit to reopen a window.
             testPeer.expectQueueBrowserAttach();
-            testPeer.expectLinkFlow(false, false, 
equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
-            testPeer.expectLinkFlow(true, true, 
equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
-            testPeer.expectLinkFlow(false, false, 
equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
+            testPeer.expectLinkFlow(false, false, 
equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
+            testPeer.expectLinkFlow(true, true, 
equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
+            testPeer.expectLinkFlow(false, false, 
equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
             testPeer.expectDetach(true, true, true);
 
             QueueBrowser browser = session.createBrowser(queue);
@@ -146,11 +145,9 @@ public class QueueBrowserIntegrationTest extends 
QpidJmsTestCase {
         final DescribedType amqpValueNullContent = new 
AmqpValueDescribedType(null);
 
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer);
+            Connection connection = testFixture.establishConnecton(testPeer, 
"?jms.prefetchPolicy.all=1");
             connection.start();
 
-            connection.getPrefetchPolicy().setAll(1);
-
             testPeer.expectBegin();
 
             Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
@@ -195,13 +192,13 @@ public class QueueBrowserIntegrationTest extends 
QpidJmsTestCase {
 
             // Expected the browser to create a consumer and send credit
             testPeer.expectQueueBrowserAttach();
-            testPeer.expectLinkFlow(false, 
equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
+            testPeer.expectLinkFlow(false, 
equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
             // Then expect it to drain it when no message arrives before 
hasMoreElements is called,
             // at which point we send one, and a response flow to indicate the 
rest of the credit was drained.
             testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, 
amqpValueNullContent,
-                1, true, true, 
equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)),
 1, true, false);
+                1, true, true, 
equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)),
 1, true, false);
             // Expect the credit window to be opened again.
-            testPeer.expectLinkFlow(false, 
equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
+            testPeer.expectLinkFlow(false, 
equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
             testPeer.expectDetach(true, true, true);
 
             QueueBrowser browser = session.createBrowser(queue);
@@ -230,13 +227,13 @@ public class QueueBrowserIntegrationTest extends 
QpidJmsTestCase {
 
             // Expected the browser to create a consumer and send credit
             testPeer.expectQueueBrowserAttach();
-            testPeer.expectLinkFlow(false, 
equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
+            testPeer.expectLinkFlow(false, 
equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
             // Then expect it to drain it when no message arrives before 
hasMoreElements is called,
             // at which point we send one, and a response flow to indicate the 
rest of the credit was drained.
             testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, 
amqpValueNullContent,
-                1, true, true, 
equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)),
 1, true, false);
+                1, true, true, 
equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)),
 1, true, false);
             // Expect the credit window to be opened again.
-            testPeer.expectLinkFlow(false, 
equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
+            testPeer.expectLinkFlow(false, 
equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
             testPeer.expectDetach(true, true, true);
 
             QueueBrowser browser = session.createBrowser(queue);
@@ -277,13 +274,13 @@ public class QueueBrowserIntegrationTest extends 
QpidJmsTestCase {
             // Expect the browser enumeration to create a underlying consumer
             testPeer.expectQueueBrowserAttach();
             // Expect initial credit to be sent
-            testPeer.expectLinkFlow(false, 
equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
+            testPeer.expectLinkFlow(false, 
equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
             // Then expect it to drain it when no message arrives before 
hasMoreElements is called,
             // at which point we send one, and a response flow to indicate the 
rest of the credit was drained.
             testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, 
amqpValueNullContent,
-                1, true, true, 
equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)),
 1, true, false);
+                1, true, true, 
equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)),
 1, true, false);
             // Expect a non-draining flow to reopen the credit window again 
afterwards
-            testPeer.expectLinkFlow(false, 
equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
+            testPeer.expectLinkFlow(false, 
equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
 
             QueueBrowser browser = session.createBrowser(queue);
             Enumeration<?> queueView = browser.getEnumeration();
@@ -304,11 +301,9 @@ public class QueueBrowserIntegrationTest extends 
QpidJmsTestCase {
     @Test(timeout=30000)
     public void testCreateQueueBrowserAndEnumerationZeroPrefetch() throws 
IOException, Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer);
+            Connection connection = testFixture.establishConnecton(testPeer, 
"?jms.prefetchPolicy.all=0");
             connection.start();
 
-            connection.getPrefetchPolicy().setAll(0);
-
             testPeer.expectBegin();
 
             Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
@@ -331,11 +326,9 @@ public class QueueBrowserIntegrationTest extends 
QpidJmsTestCase {
     @Test(timeout=30000)
     public void testQueueBrowserHasMoreElementsZeroPrefetchNoMessage() throws 
IOException, Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer);
+            Connection connection = testFixture.establishConnecton(testPeer, 
"?jms.prefetchPolicy.all=0");
             connection.start();
 
-            connection.getPrefetchPolicy().setAll(0);
-
             testPeer.expectBegin();
 
             Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
@@ -362,11 +355,9 @@ public class QueueBrowserIntegrationTest extends 
QpidJmsTestCase {
         DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
 
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer);
+            Connection connection = testFixture.establishConnecton(testPeer, 
"?jms.prefetchPolicy.all=0");
             connection.start();
 
-            connection.getPrefetchPolicy().setAll(0);
-
             testPeer.expectBegin();
 
             Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 53a4c43..fe9b4b4 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -59,7 +59,7 @@ import javax.jms.TopicSubscriber;
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.JmsDefaultConnectionListener;
 import org.apache.qpid.jms.JmsOperationTimedOutException;
-import org.apache.qpid.jms.JmsPrefetchPolicy;
+import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
 import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.Wait;
@@ -1197,11 +1197,9 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
             final int COUNT = 5;
 
-            Connection connection = testFixture.establishConnecton(testPeer);
+            Connection connection = testFixture.establishConnecton(testPeer, 
"?jms.redeliveryPolicy.maxRedeliveries=1");
             connection.start();
 
-            ((JmsConnection) 
connection).getRedeliveryPolicy().setMaxRedeliveries(1);
-
             testPeer.expectBegin();
 
             Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
@@ -1242,9 +1240,8 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
     @Test(timeout=20000)
     public void testPrefetchPolicyInfluencesCreditFlow() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            Connection connection = testFixture.establishConnecton(testPeer);
-            int newPrefetch = 263;
-            ((JmsConnection) 
connection).getPrefetchPolicy().setAll(newPrefetch);
+            final int newPrefetch = 263;
+            Connection connection = testFixture.establishConnecton(testPeer, 
"?jms.prefetchPolicy.all=" + newPrefetch);
             connection.start();
 
             testPeer.expectBegin();
@@ -1492,7 +1489,7 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
 
             int messageCount = 10;
             testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, 
new AmqpValueDescribedType("content"),
-                    messageCount, false, false, 
equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)), 1, 
true);
+                    messageCount, false, false, 
equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)),
 1, true);
 
             Queue queue = session.createQueue("myQueue");
             MessageConsumer consumer = session.createConsumer(queue);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
index 2ce34fd..e7f2b89 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
@@ -40,7 +40,7 @@ import javax.jms.TransactionRolledBackException;
 
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.JmsOperationTimedOutException;
-import org.apache.qpid.jms.JmsPrefetchPolicy;
+import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
@@ -451,8 +451,8 @@ public class TransactionsIntegrationTest extends 
QpidJmsTestCase {
 
             testPeer.expectReceiverAttach();
             testPeer.expectLinkFlow();
-            testPeer.expectLinkFlow(true, true, 
equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
-            testPeer.expectLinkFlow(false, false, 
equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
+            testPeer.expectLinkFlow(true, true, 
equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
+            testPeer.expectLinkFlow(false, false, 
equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
             testPeer.expectDetach(true, true, true);
 
             MessageConsumer messageConsumer = session.createConsumer(queue);
@@ -498,8 +498,8 @@ public class TransactionsIntegrationTest extends 
QpidJmsTestCase {
 
             testPeer.expectReceiverAttach();
             testPeer.expectLinkFlow();
-            testPeer.expectLinkFlow(true, true, 
equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
-            testPeer.expectLinkFlow(false, false, 
equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
+            testPeer.expectLinkFlow(true, true, 
equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
+            testPeer.expectLinkFlow(false, false, 
equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)));
             testPeer.expectDetach(true, true, true);
 
             MessageConsumer messageConsumer = session.createConsumer(queue);
@@ -678,9 +678,8 @@ public class TransactionsIntegrationTest extends 
QpidJmsTestCase {
     @Test(timeout=20000)
     public void 
testRollbackTransactedSessionWithPrefetchFullBeforeStoppingConsumer() throws 
Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            Connection connection = testFixture.establishConnecton(testPeer);
-            int messageCount = 5;
-            ((JmsConnection) 
connection).getPrefetchPolicy().setAll(messageCount);
+            final int messageCount = 5;
+            Connection connection = testFixture.establishConnecton(testPeer, 
"?jms.prefetchPolicy.all=" + messageCount);
             connection.start();
 
             testPeer.expectBegin();
@@ -754,9 +753,8 @@ public class TransactionsIntegrationTest extends 
QpidJmsTestCase {
     @Test(timeout=20000)
     public void 
testRollbackTransactedSessionWithPrefetchFullyUtilisedByDrainWhenStoppingConsumer()
 throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            Connection connection = testFixture.establishConnecton(testPeer);
-            int messageCount = 5;
-            ((JmsConnection) 
connection).getPrefetchPolicy().setAll(messageCount);
+            final int messageCount = 5;
+            Connection connection = testFixture.establishConnecton(testPeer, 
"?jms.prefetchPolicy.all=" + messageCount);
             connection.start();
 
             testPeer.expectBegin();

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index 64af655..8eaf707 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -49,8 +49,8 @@ import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.apache.qpid.jms.JmsDefaultConnectionListener;
 import org.apache.qpid.jms.JmsOperationTimedOutException;
-import org.apache.qpid.jms.JmsPrefetchPolicy;
 import org.apache.qpid.jms.JmsSendTimedOutException;
+import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
@@ -442,7 +442,7 @@ public class FailoverIntegrationTest extends 
QpidJmsTestCase {
             originalPeer.dropAfterLastHandler();
 
             final JmsConnection connection = 
establishAnonymousConnecton(originalPeer, finalPeer);
-            connection.getPrefetchPolicy().setQueuePrefetch(0);
+            ((JmsDefaultPrefetchPolicy) 
connection.getPrefetchPolicy()).setQueuePrefetch(0);
             connection.addConnectionListener(new 
JmsDefaultConnectionListener() {
                 @Override
                 public void onConnectionEstablished(URI remoteURI) {
@@ -515,7 +515,7 @@ public class FailoverIntegrationTest extends 
QpidJmsTestCase {
             originalPeer.dropAfterLastHandler();
 
             final JmsConnection connection = 
establishAnonymousConnecton(originalPeer, finalPeer);
-            connection.getPrefetchPolicy().setQueuePrefetch(0);
+            ((JmsDefaultPrefetchPolicy) 
connection.getPrefetchPolicy()).setQueuePrefetch(0);
             connection.addConnectionListener(new 
JmsDefaultConnectionListener() {
                 @Override
                 public void onConnectionEstablished(URI remoteURI) {
@@ -591,7 +591,7 @@ public class FailoverIntegrationTest extends 
QpidJmsTestCase {
             originalPeer.dropAfterLastHandler();
 
             final JmsConnection connection = 
establishAnonymousConnecton(originalPeer, finalPeer);
-            connection.getPrefetchPolicy().setQueuePrefetch(0);
+            ((JmsDefaultPrefetchPolicy) 
connection.getPrefetchPolicy()).setQueuePrefetch(0);
             connection.addConnectionListener(new 
JmsDefaultConnectionListener() {
                 @Override
                 public void onConnectionEstablished(URI remoteURI) {
@@ -689,7 +689,7 @@ public class FailoverIntegrationTest extends 
QpidJmsTestCase {
             originalPeer.expectBegin();
             originalPeer.expectQueueBrowserAttach();
             originalPeer.expectLinkFlow();
-            originalPeer.expectLinkFlow(true, false, 
equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
+            originalPeer.expectLinkFlow(true, false, 
equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
             originalPeer.dropAfterLastHandler();
 
             // --- Post Failover Expectations of FinalPeer --- //
@@ -698,9 +698,9 @@ public class FailoverIntegrationTest extends 
QpidJmsTestCase {
             finalPeer.expectBegin();
             finalPeer.expectBegin();
             finalPeer.expectQueueBrowserAttach();
-            finalPeer.expectLinkFlow(false, false, 
equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
-            finalPeer.expectLinkFlow(true, true, 
equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
-            finalPeer.expectLinkFlow(false, false, 
equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
+            finalPeer.expectLinkFlow(false, false, 
equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
+            finalPeer.expectLinkFlow(true, true, 
equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
+            finalPeer.expectLinkFlow(false, false, 
equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
             finalPeer.expectDetach(true, true, true);
 
             Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java
 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java
index 5cb45db..86e9a43 100644
--- 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java
+++ 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java
@@ -34,6 +34,7 @@ import javax.jms.TextMessage;
 
 import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
 import org.apache.qpid.jms.support.AmqpTestSupport;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -144,7 +145,7 @@ public class JmsQueueBrowserTest extends AmqpTestSupport {
         connection = createAmqpConnection();
 
         JmsConnection jmsConnection = (JmsConnection) connection;
-        jmsConnection.getPrefetchPolicy().setQueueBrowserPrefetch(1);
+        ((JmsDefaultPrefetchPolicy) 
jmsConnection.getPrefetchPolicy()).setQueueBrowserPrefetch(1);
 
         connection.start();
 
@@ -175,7 +176,7 @@ public class JmsQueueBrowserTest extends AmqpTestSupport {
         connection = createAmqpConnection();
 
         JmsConnection jmsConnection = (JmsConnection) connection;
-        jmsConnection.getPrefetchPolicy().setQueueBrowserPrefetch(0);
+        ((JmsDefaultPrefetchPolicy) 
jmsConnection.getPrefetchPolicy()).setQueueBrowserPrefetch(0);
 
         connection.start();
 
@@ -274,7 +275,7 @@ public class JmsQueueBrowserTest extends AmqpTestSupport {
     @Test(timeout = 60000)
     public void testBrowseAllInQueueSmallPrefetch() throws Exception {
         connection = createAmqpConnection();
-        ((JmsConnection) 
connection).getPrefetchPolicy().setQueueBrowserPrefetch(10);
+        ((JmsDefaultPrefetchPolicy) ((JmsConnection) 
connection).getPrefetchPolicy()).setQueueBrowserPrefetch(1);
         connection.start();
 
         final int MSG_COUNT = 30;

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
index f56fbfa..9e780fb 100644
--- 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
+++ 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
@@ -30,6 +30,7 @@ import javax.jms.TextMessage;
 
 import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.policy.JmsDefaultRedeliveryPolicy;
 import org.apache.qpid.jms.support.AmqpTestSupport;
 import org.apache.qpid.jms.support.Wait;
 import org.junit.Test;
@@ -39,10 +40,14 @@ import org.junit.Test;
  */
 public class JmsZeroPrefetchTest extends AmqpTestSupport {
 
+    @Override
+    public String getAmqpConnectionURIOptions() {
+        return "jms.prefetchPolicy.all=0";
+    }
+
     @Test(timeout = 60000)
     public void testBlockingReceivesUnBlocksOnMessageSend() throws Exception {
         connection = createAmqpConnection();
-        ((JmsConnection)connection).getPrefetchPolicy().setAll(0);
         connection.start();
 
         final Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
@@ -83,7 +88,6 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport {
     @Test(timeout = 60000)
     public void testReceiveTimesOutAndRemovesCredit() throws Exception {
         connection = createAmqpConnection();
-        ((JmsConnection)connection).getPrefetchPolicy().setAll(0);
         connection.start();
 
         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
@@ -113,7 +117,6 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport {
     @Test(timeout = 60000)
     public void testReceiveNoWaitWaitForSever() throws Exception {
         connection = createAmqpConnection();
-        ((JmsConnection)connection).getPrefetchPolicy().setAll(0);
         connection.start();
 
         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
@@ -146,7 +149,6 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport {
     @Test(timeout = 60000)
     public void testRepeatedPullAttempts() throws Exception {
         connection = createAmqpConnection();
-        ((JmsConnection)connection).getPrefetchPolicy().setAll(0);
         connection.start();
 
         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
@@ -169,7 +171,6 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport {
     @Test(timeout = 60000)
     public void testPullConsumerOnlyRequestsOneMessage() throws Exception {
         connection = createAmqpConnection();
-        ((JmsConnection)connection).getPrefetchPolicy().setAll(0);
         connection.start();
 
         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
@@ -205,7 +206,6 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport {
     @Test(timeout = 60000)
     public void testTwoConsumers() throws Exception {
         connection = createAmqpConnection();
-        ((JmsConnection)connection).getPrefetchPolicy().setAll(0);
         connection.start();
 
         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
@@ -232,7 +232,6 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport {
     @Test(timeout = 60000)
     public void testConsumerWithNoMessageDoesNotHogMessages() throws Exception 
{
         connection = createAmqpConnection();
-        ((JmsConnection)connection).getPrefetchPolicy().setAll(0);
         connection.start();
 
         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
@@ -276,11 +275,13 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport {
         session.rollback();
         session.close();
 
+        JmsDefaultRedeliveryPolicy redeliveryPolicy = new 
JmsDefaultRedeliveryPolicy();
+        redeliveryPolicy.setMaxRedeliveries(0);
+
         // Reconnect with zero prefetch and zero redeliveries allowed.
         connection.close();
         connection = createAmqpConnection();
-        ((JmsConnection)connection).getPrefetchPolicy().setAll(0);
-        
((JmsConnection)connection).getRedeliveryPolicy().setMaxRedeliveries(0);
+        ((JmsConnection)connection).setRedeliveryPolicy(redeliveryPolicy);
         connection.start();
 
         // try consume with timeout - expect it to timeout and return NULL 
message

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java
 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java
index e5b7766..dd33bdc 100644
--- 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java
+++ 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java
@@ -39,7 +39,6 @@ import javax.jms.Session;
 import javax.jms.Topic;
 
 import org.apache.activemq.broker.jmx.QueueViewMBean;
-import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.apache.qpid.jms.support.AmqpTestSupport;
 import org.apache.qpid.jms.support.Wait;
@@ -339,7 +338,7 @@ public class JmsFailoverTest extends AmqpTestSupport {
 
     @Test(timeout = 30000)
     public void testPullConsumerTimedReceiveRecovers() throws Exception {
-        URI brokerURI = new URI(getAmqpFailoverURI());
+        URI brokerURI = new URI(getAmqpFailoverURI() + 
"?jms.prefetchPolicy.all=0");
 
         final CountDownLatch started = new CountDownLatch(1);
         final CountDownLatch received = new CountDownLatch(1);
@@ -347,10 +346,6 @@ public class JmsFailoverTest extends AmqpTestSupport {
         connection = createAmqpConnection(brokerURI);
         connection.start();
 
-        // Make all our consumers pull consumers.
-        JmsConnection jmsConnection = (JmsConnection) connection;
-        jmsConnection.getPrefetchPolicy().setAll(0);
-
         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
         Queue queue = session.createQueue(name.getMethodName());
         final MessageConsumer consumer = session.createConsumer(queue);
@@ -403,7 +398,7 @@ public class JmsFailoverTest extends AmqpTestSupport {
 
     @Test(timeout = 30000)
     public void testPullConsumerReceiveRecovers() throws Exception {
-        URI brokerURI = new URI(getAmqpFailoverURI());
+        URI brokerURI = new URI(getAmqpFailoverURI() + 
"?jms.prefetchPolicy.all=0");
 
         final CountDownLatch started = new CountDownLatch(1);
         final CountDownLatch received = new CountDownLatch(1);
@@ -411,10 +406,6 @@ public class JmsFailoverTest extends AmqpTestSupport {
         connection = createAmqpConnection(brokerURI);
         connection.start();
 
-        // Make all our consumers pull consumers.
-        JmsConnection jmsConnection = (JmsConnection) connection;
-        jmsConnection.getPrefetchPolicy().setAll(0);
-
         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
         Queue queue = session.createQueue(name.getMethodName());
         final MessageConsumer consumer = session.createConsumer(queue);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to