Repository: qpid-jms Updated Branches: refs/heads/master ff7f49fe6 -> b5f00d23a
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java index b041f51..f0726f9 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java @@ -33,8 +33,7 @@ import javax.jms.Queue; import javax.jms.QueueBrowser; import javax.jms.Session; -import org.apache.qpid.jms.JmsConnection; -import org.apache.qpid.jms.JmsPrefetchPolicy; +import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy; import org.apache.qpid.jms.test.QpidJmsTestCase; import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; import org.apache.qpid.jms.test.testpeer.describedtypes.Declare; @@ -89,7 +88,7 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase { // Expected the browser to create a consumer and send credit. testPeer.expectQueueBrowserAttach(); - testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); + testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); testPeer.expectDetach(true, true, true); QueueBrowser browser = session.createBrowser(queue); @@ -119,9 +118,9 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase { // no message is there to satisfy an internal hasMoreElements check, then send more // credit to reopen a window. testPeer.expectQueueBrowserAttach(); - testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); - testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); - testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); + testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); + testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); + testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); testPeer.expectDetach(true, true, true); QueueBrowser browser = session.createBrowser(queue); @@ -146,11 +145,9 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase { final DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); try (TestAmqpPeer testPeer = new TestAmqpPeer();) { - JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); + Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=1"); connection.start(); - connection.getPrefetchPolicy().setAll(1); - testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -195,13 +192,13 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase { // Expected the browser to create a consumer and send credit testPeer.expectQueueBrowserAttach(); - testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); + testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); // Then expect it to drain it when no message arrives before hasMoreElements is called, // at which point we send one, and a response flow to indicate the rest of the credit was drained. testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent, - 1, true, true, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)), 1, true, false); + 1, true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)), 1, true, false); // Expect the credit window to be opened again. - testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); + testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); testPeer.expectDetach(true, true, true); QueueBrowser browser = session.createBrowser(queue); @@ -230,13 +227,13 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase { // Expected the browser to create a consumer and send credit testPeer.expectQueueBrowserAttach(); - testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); + testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); // Then expect it to drain it when no message arrives before hasMoreElements is called, // at which point we send one, and a response flow to indicate the rest of the credit was drained. testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent, - 1, true, true, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)), 1, true, false); + 1, true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)), 1, true, false); // Expect the credit window to be opened again. - testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); + testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); testPeer.expectDetach(true, true, true); QueueBrowser browser = session.createBrowser(queue); @@ -277,13 +274,13 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase { // Expect the browser enumeration to create a underlying consumer testPeer.expectQueueBrowserAttach(); // Expect initial credit to be sent - testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); + testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); // Then expect it to drain it when no message arrives before hasMoreElements is called, // at which point we send one, and a response flow to indicate the rest of the credit was drained. testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent, - 1, true, true, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)), 1, true, false); + 1, true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)), 1, true, false); // Expect a non-draining flow to reopen the credit window again afterwards - testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); + testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); QueueBrowser browser = session.createBrowser(queue); Enumeration<?> queueView = browser.getEnumeration(); @@ -304,11 +301,9 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase { @Test(timeout=30000) public void testCreateQueueBrowserAndEnumerationZeroPrefetch() throws IOException, Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { - JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); + Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0"); connection.start(); - connection.getPrefetchPolicy().setAll(0); - testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -331,11 +326,9 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase { @Test(timeout=30000) public void testQueueBrowserHasMoreElementsZeroPrefetchNoMessage() throws IOException, Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { - JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); + Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0"); connection.start(); - connection.getPrefetchPolicy().setAll(0); - testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -362,11 +355,9 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase { DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); try (TestAmqpPeer testPeer = new TestAmqpPeer();) { - JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); + Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0"); connection.start(); - connection.getPrefetchPolicy().setAll(0); - testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java index 53a4c43..fe9b4b4 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java @@ -59,7 +59,7 @@ import javax.jms.TopicSubscriber; import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.JmsDefaultConnectionListener; import org.apache.qpid.jms.JmsOperationTimedOutException; -import org.apache.qpid.jms.JmsPrefetchPolicy; +import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy; import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper; import org.apache.qpid.jms.test.QpidJmsTestCase; import org.apache.qpid.jms.test.Wait; @@ -1197,11 +1197,9 @@ public class SessionIntegrationTest extends QpidJmsTestCase { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { final int COUNT = 5; - Connection connection = testFixture.establishConnecton(testPeer); + Connection connection = testFixture.establishConnecton(testPeer, "?jms.redeliveryPolicy.maxRedeliveries=1"); connection.start(); - ((JmsConnection) connection).getRedeliveryPolicy().setMaxRedeliveries(1); - testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -1242,9 +1240,8 @@ public class SessionIntegrationTest extends QpidJmsTestCase { @Test(timeout=20000) public void testPrefetchPolicyInfluencesCreditFlow() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { - Connection connection = testFixture.establishConnecton(testPeer); - int newPrefetch = 263; - ((JmsConnection) connection).getPrefetchPolicy().setAll(newPrefetch); + final int newPrefetch = 263; + Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=" + newPrefetch); connection.start(); testPeer.expectBegin(); @@ -1492,7 +1489,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { int messageCount = 10; testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), - messageCount, false, false, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)), 1, true); + messageCount, false, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)), 1, true); Queue queue = session.createQueue("myQueue"); MessageConsumer consumer = session.createConsumer(queue); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java index 2ce34fd..e7f2b89 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java @@ -40,7 +40,7 @@ import javax.jms.TransactionRolledBackException; import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.JmsOperationTimedOutException; -import org.apache.qpid.jms.JmsPrefetchPolicy; +import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy; import org.apache.qpid.jms.test.QpidJmsTestCase; import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted; @@ -451,8 +451,8 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase { testPeer.expectReceiverAttach(); testPeer.expectLinkFlow(); - testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_PREFETCH))); - testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_PREFETCH))); + testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH))); + testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH))); testPeer.expectDetach(true, true, true); MessageConsumer messageConsumer = session.createConsumer(queue); @@ -498,8 +498,8 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase { testPeer.expectReceiverAttach(); testPeer.expectLinkFlow(); - testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_PREFETCH))); - testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_PREFETCH))); + testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH))); + testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH))); testPeer.expectDetach(true, true, true); MessageConsumer messageConsumer = session.createConsumer(queue); @@ -678,9 +678,8 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase { @Test(timeout=20000) public void testRollbackTransactedSessionWithPrefetchFullBeforeStoppingConsumer() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { - Connection connection = testFixture.establishConnecton(testPeer); - int messageCount = 5; - ((JmsConnection) connection).getPrefetchPolicy().setAll(messageCount); + final int messageCount = 5; + Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=" + messageCount); connection.start(); testPeer.expectBegin(); @@ -754,9 +753,8 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase { @Test(timeout=20000) public void testRollbackTransactedSessionWithPrefetchFullyUtilisedByDrainWhenStoppingConsumer() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { - Connection connection = testFixture.establishConnecton(testPeer); - int messageCount = 5; - ((JmsConnection) connection).getPrefetchPolicy().setAll(messageCount); + final int messageCount = 5; + Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=" + messageCount); connection.start(); testPeer.expectBegin(); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java index 64af655..8eaf707 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java @@ -49,8 +49,8 @@ import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.jms.JmsDefaultConnectionListener; import org.apache.qpid.jms.JmsOperationTimedOutException; -import org.apache.qpid.jms.JmsPrefetchPolicy; import org.apache.qpid.jms.JmsSendTimedOutException; +import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy; import org.apache.qpid.jms.test.QpidJmsTestCase; import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError; @@ -442,7 +442,7 @@ public class FailoverIntegrationTest extends QpidJmsTestCase { originalPeer.dropAfterLastHandler(); final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); - connection.getPrefetchPolicy().setQueuePrefetch(0); + ((JmsDefaultPrefetchPolicy) connection.getPrefetchPolicy()).setQueuePrefetch(0); connection.addConnectionListener(new JmsDefaultConnectionListener() { @Override public void onConnectionEstablished(URI remoteURI) { @@ -515,7 +515,7 @@ public class FailoverIntegrationTest extends QpidJmsTestCase { originalPeer.dropAfterLastHandler(); final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); - connection.getPrefetchPolicy().setQueuePrefetch(0); + ((JmsDefaultPrefetchPolicy) connection.getPrefetchPolicy()).setQueuePrefetch(0); connection.addConnectionListener(new JmsDefaultConnectionListener() { @Override public void onConnectionEstablished(URI remoteURI) { @@ -591,7 +591,7 @@ public class FailoverIntegrationTest extends QpidJmsTestCase { originalPeer.dropAfterLastHandler(); final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); - connection.getPrefetchPolicy().setQueuePrefetch(0); + ((JmsDefaultPrefetchPolicy) connection.getPrefetchPolicy()).setQueuePrefetch(0); connection.addConnectionListener(new JmsDefaultConnectionListener() { @Override public void onConnectionEstablished(URI remoteURI) { @@ -689,7 +689,7 @@ public class FailoverIntegrationTest extends QpidJmsTestCase { originalPeer.expectBegin(); originalPeer.expectQueueBrowserAttach(); originalPeer.expectLinkFlow(); - originalPeer.expectLinkFlow(true, false, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); + originalPeer.expectLinkFlow(true, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); originalPeer.dropAfterLastHandler(); // --- Post Failover Expectations of FinalPeer --- // @@ -698,9 +698,9 @@ public class FailoverIntegrationTest extends QpidJmsTestCase { finalPeer.expectBegin(); finalPeer.expectBegin(); finalPeer.expectQueueBrowserAttach(); - finalPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); - finalPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); - finalPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); + finalPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); + finalPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); + finalPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); finalPeer.expectDetach(true, true, true); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java index 5cb45db..86e9a43 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java @@ -34,6 +34,7 @@ import javax.jms.TextMessage; import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy; import org.apache.qpid.jms.support.AmqpTestSupport; import org.junit.Test; import org.slf4j.Logger; @@ -144,7 +145,7 @@ public class JmsQueueBrowserTest extends AmqpTestSupport { connection = createAmqpConnection(); JmsConnection jmsConnection = (JmsConnection) connection; - jmsConnection.getPrefetchPolicy().setQueueBrowserPrefetch(1); + ((JmsDefaultPrefetchPolicy) jmsConnection.getPrefetchPolicy()).setQueueBrowserPrefetch(1); connection.start(); @@ -175,7 +176,7 @@ public class JmsQueueBrowserTest extends AmqpTestSupport { connection = createAmqpConnection(); JmsConnection jmsConnection = (JmsConnection) connection; - jmsConnection.getPrefetchPolicy().setQueueBrowserPrefetch(0); + ((JmsDefaultPrefetchPolicy) jmsConnection.getPrefetchPolicy()).setQueueBrowserPrefetch(0); connection.start(); @@ -274,7 +275,7 @@ public class JmsQueueBrowserTest extends AmqpTestSupport { @Test(timeout = 60000) public void testBrowseAllInQueueSmallPrefetch() throws Exception { connection = createAmqpConnection(); - ((JmsConnection) connection).getPrefetchPolicy().setQueueBrowserPrefetch(10); + ((JmsDefaultPrefetchPolicy) ((JmsConnection) connection).getPrefetchPolicy()).setQueueBrowserPrefetch(1); connection.start(); final int MSG_COUNT = 30; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java index f56fbfa..9e780fb 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java @@ -30,6 +30,7 @@ import javax.jms.TextMessage; import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.policy.JmsDefaultRedeliveryPolicy; import org.apache.qpid.jms.support.AmqpTestSupport; import org.apache.qpid.jms.support.Wait; import org.junit.Test; @@ -39,10 +40,14 @@ import org.junit.Test; */ public class JmsZeroPrefetchTest extends AmqpTestSupport { + @Override + public String getAmqpConnectionURIOptions() { + return "jms.prefetchPolicy.all=0"; + } + @Test(timeout = 60000) public void testBlockingReceivesUnBlocksOnMessageSend() throws Exception { connection = createAmqpConnection(); - ((JmsConnection)connection).getPrefetchPolicy().setAll(0); connection.start(); final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -83,7 +88,6 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport { @Test(timeout = 60000) public void testReceiveTimesOutAndRemovesCredit() throws Exception { connection = createAmqpConnection(); - ((JmsConnection)connection).getPrefetchPolicy().setAll(0); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -113,7 +117,6 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport { @Test(timeout = 60000) public void testReceiveNoWaitWaitForSever() throws Exception { connection = createAmqpConnection(); - ((JmsConnection)connection).getPrefetchPolicy().setAll(0); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -146,7 +149,6 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport { @Test(timeout = 60000) public void testRepeatedPullAttempts() throws Exception { connection = createAmqpConnection(); - ((JmsConnection)connection).getPrefetchPolicy().setAll(0); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -169,7 +171,6 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport { @Test(timeout = 60000) public void testPullConsumerOnlyRequestsOneMessage() throws Exception { connection = createAmqpConnection(); - ((JmsConnection)connection).getPrefetchPolicy().setAll(0); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -205,7 +206,6 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport { @Test(timeout = 60000) public void testTwoConsumers() throws Exception { connection = createAmqpConnection(); - ((JmsConnection)connection).getPrefetchPolicy().setAll(0); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -232,7 +232,6 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport { @Test(timeout = 60000) public void testConsumerWithNoMessageDoesNotHogMessages() throws Exception { connection = createAmqpConnection(); - ((JmsConnection)connection).getPrefetchPolicy().setAll(0); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -276,11 +275,13 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport { session.rollback(); session.close(); + JmsDefaultRedeliveryPolicy redeliveryPolicy = new JmsDefaultRedeliveryPolicy(); + redeliveryPolicy.setMaxRedeliveries(0); + // Reconnect with zero prefetch and zero redeliveries allowed. connection.close(); connection = createAmqpConnection(); - ((JmsConnection)connection).getPrefetchPolicy().setAll(0); - ((JmsConnection)connection).getRedeliveryPolicy().setMaxRedeliveries(0); + ((JmsConnection)connection).setRedeliveryPolicy(redeliveryPolicy); connection.start(); // try consume with timeout - expect it to timeout and return NULL message http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5f00d23/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java index e5b7766..dd33bdc 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java @@ -39,7 +39,6 @@ import javax.jms.Session; import javax.jms.Topic; import org.apache.activemq.broker.jmx.QueueViewMBean; -import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.jms.support.AmqpTestSupport; import org.apache.qpid.jms.support.Wait; @@ -339,7 +338,7 @@ public class JmsFailoverTest extends AmqpTestSupport { @Test(timeout = 30000) public void testPullConsumerTimedReceiveRecovers() throws Exception { - URI brokerURI = new URI(getAmqpFailoverURI()); + URI brokerURI = new URI(getAmqpFailoverURI() + "?jms.prefetchPolicy.all=0"); final CountDownLatch started = new CountDownLatch(1); final CountDownLatch received = new CountDownLatch(1); @@ -347,10 +346,6 @@ public class JmsFailoverTest extends AmqpTestSupport { connection = createAmqpConnection(brokerURI); connection.start(); - // Make all our consumers pull consumers. - JmsConnection jmsConnection = (JmsConnection) connection; - jmsConnection.getPrefetchPolicy().setAll(0); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(name.getMethodName()); final MessageConsumer consumer = session.createConsumer(queue); @@ -403,7 +398,7 @@ public class JmsFailoverTest extends AmqpTestSupport { @Test(timeout = 30000) public void testPullConsumerReceiveRecovers() throws Exception { - URI brokerURI = new URI(getAmqpFailoverURI()); + URI brokerURI = new URI(getAmqpFailoverURI() + "?jms.prefetchPolicy.all=0"); final CountDownLatch started = new CountDownLatch(1); final CountDownLatch received = new CountDownLatch(1); @@ -411,10 +406,6 @@ public class JmsFailoverTest extends AmqpTestSupport { connection = createAmqpConnection(brokerURI); connection.start(); - // Make all our consumers pull consumers. - JmsConnection jmsConnection = (JmsConnection) connection; - jmsConnection.getPrefetchPolicy().setAll(0); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(name.getMethodName()); final MessageConsumer consumer = session.createConsumer(queue); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
