Repository: qpid-jms Updated Branches: refs/heads/master 7da52038a -> f4b709209
QPIDJMS-84, QPIDJMS-80: update to 0.10-SNAPSHOT, get session window handling changes, make session outgoing window configurable if needed Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/f4b70920 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/f4b70920 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/f4b70920 Branch: refs/heads/master Commit: f4b709209fbea24cddab35244e1772bca659fff2 Parents: 7da5203 Author: Robert Gemmell <[email protected]> Authored: Thu Jul 9 15:50:17 2015 +0100 Committer: Robert Gemmell <[email protected]> Committed: Thu Jul 9 15:50:17 2015 +0100 ---------------------------------------------------------------------- pom.xml | 2 +- .../qpid/jms/provider/amqp/AmqpProvider.java | 15 ++++ .../qpid/jms/provider/amqp/AmqpSession.java | 10 ++- .../BytesMessageIntegrationTest.java | 8 +- .../integration/ConnectionIntegrationTest.java | 10 +-- .../integration/ConsumerIntegrationTest.java | 6 +- .../ForeignMessageIntegrationTest.java | 2 +- .../integration/IdleTimeoutIntegrationTest.java | 12 +-- .../jms/integration/IntegrationTestFixture.java | 2 +- .../integration/MapMessageIntegrationTest.java | 4 +- .../jms/integration/MessageIntegrationTest.java | 48 +++++------ .../ObjectMessageIntegrationTest.java | 10 +-- .../integration/ProducerIntegrationTest.java | 28 +++--- .../jms/integration/SaslIntegrationTest.java | 8 +- .../jms/integration/SessionIntegrationTest.java | 91 +++++++++++++------- .../StreamMessageIntegrationTest.java | 4 +- .../integration/TextMessageIntegrationTest.java | 10 +-- .../jms/provider/amqp/AmqpProviderTest.java | 2 +- .../provider/failover/FailoverRedirectTest.java | 6 +- .../qpid/jms/test/testpeer/TestAmqpPeer.java | 23 +++-- 20 files changed, 178 insertions(+), 123 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f4b70920/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 22bce64..d663ef1 100644 --- a/pom.xml +++ b/pom.xml @@ -43,7 +43,7 @@ <target-version>1.7</target-version> <!-- Dependency Versions for this Project --> - <proton-version>0.9.1</proton-version> + <proton-version>0.10-SNAPSHOT</proton-version> <netty-version>4.0.17.Final</netty-version> <slf4j-version>1.7.12</slf4j-version> <geronimo-jms-1-1-spec-version>1.1.1</geronimo-jms-1-1-spec-version> http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f4b70920/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index fe46fe0..a927bf0 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -117,6 +117,7 @@ public class AmqpProvider implements Provider, TransportListener { private long sendTimeout = JmsConnectionInfo.DEFAULT_SEND_TIMEOUT; private int channelMax = DEFAULT_CHANNEL_MAX; private int idleTimeout = 60000; + private long sessionOutoingWindow = -1; //Use proton default private final URI remoteURI; private final AtomicBoolean closed = new AtomicBoolean(); @@ -954,6 +955,20 @@ public class AmqpProvider implements Provider, TransportListener { this.idleTimeout = idleTimeout; } + public long getSessionOutgoingWindow() { + return sessionOutoingWindow; + } + + /** + * Sets the outgoing window size for the AMQP session. Values may + * be between -1 and 2^32-1, where -1 indicates to use the default. + * + * @param sessionOutoingWindow the outgoing window size + */ + public void setSessionOutgoingWindow(long sessionOutoingWindow) { + this.sessionOutoingWindow = sessionOutoingWindow; + } + public long getCloseTimeout() { return this.closeTimeout; } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f4b70920/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java index 27d018e..694ce2c 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java @@ -65,8 +65,16 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> { @Override protected void doOpen() { - this.getEndpoint().setIncomingCapacity(Integer.MAX_VALUE); + long outgoingWindow = getProvider().getSessionOutgoingWindow(); + + Session session = this.getEndpoint(); + session.setIncomingCapacity(Integer.MAX_VALUE); + if(outgoingWindow >= 0) { + session.setOutgoingWindow(outgoingWindow); + } + this.connection.addSession(this); + super.doOpen(); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f4b70920/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java index a0a8d05..540da39 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java @@ -61,7 +61,7 @@ public class BytesMessageIntegrationTest extends QpidJmsTestCase { public void testSendBasicBytesMessageWithContent() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); testPeer.expectSenderAttach(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -115,7 +115,7 @@ public class BytesMessageIntegrationTest extends QpidJmsTestCase { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("myQueue"); @@ -164,7 +164,7 @@ public class BytesMessageIntegrationTest extends QpidJmsTestCase { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("myQueue"); @@ -272,7 +272,7 @@ public class BytesMessageIntegrationTest extends QpidJmsTestCase { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("myQueue"); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f4b70920/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java index 200e4f4..6cf87c6 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java @@ -92,7 +92,7 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase { public void testCreateAutoAckSession() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); assertNotNull("Session should not be null", session); } @@ -103,7 +103,7 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); // Expect the session, with an immediate link to the transaction coordinator // using a target with the expected capabilities only. CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); @@ -136,7 +136,7 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase { hasEntry(AmqpSupport.PLATFORM, MetaDataSupport.PLATFORM_DETAILS)); testPeer.expectSaslAnonymousConnect(null, null, connPropsMatcher, null); - testPeer.expectBegin(true); + testPeer.expectBegin(); ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort() + "?jms.clientID=foo"); Connection connection = factory.createConnection(); @@ -169,7 +169,7 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { testPeer.expectSaslAnonymousConnect(null, hostnameMatcher); // Each connection creates a session for managing temporary destinations etc - testPeer.expectBegin(true); + testPeer.expectBegin(); String uri = "amqp://localhost:" + testPeer.getServerPort(); if(setHostnameOption) { @@ -273,7 +273,7 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { final Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a consumer, then remotely end the connection afterwards. http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f4b70920/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java index ae66557..55e7df6 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java @@ -43,7 +43,7 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase { public void testCloseConsumer() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); testPeer.expectReceiverAttach(); testPeer.expectLinkFlow(); @@ -65,7 +65,7 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a consumer, then remotely end it afterwards. @@ -112,7 +112,7 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("myQueue"); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f4b70920/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ForeignMessageIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ForeignMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ForeignMessageIntegrationTest.java index 77c2b05..fcb7d89 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ForeignMessageIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ForeignMessageIntegrationTest.java @@ -45,7 +45,7 @@ public class ForeignMessageIntegrationTest extends QpidJmsTestCase { public void testSendForeignBytesMessageWithContent() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); testPeer.expectSenderAttach(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f4b70920/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java index 6c1556c..66280ff 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java @@ -54,7 +54,7 @@ public class IdleTimeoutIntegrationTest extends QpidJmsTestCase { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { testPeer.expectSaslAnonymousConnect(greaterThan(UnsignedInteger.valueOf(0)), null); // Each connection creates a session for managing temporary destinations etc - testPeer.expectBegin(true); + testPeer.expectBegin(); ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort()); Connection connection = factory.createConnection(); @@ -77,7 +77,7 @@ public class IdleTimeoutIntegrationTest extends QpidJmsTestCase { testPeer.expectSaslAnonymousConnect(equalTo(UnsignedInteger.valueOf(advertisedValue)), null); // Each connection creates a session for managing temporary destinations etc - testPeer.expectBegin(true); + testPeer.expectBegin(); ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort() + "?amqp.idleTimeout=" + configuredTimeout); Connection connection = factory.createConnection(); @@ -103,7 +103,7 @@ public class IdleTimeoutIntegrationTest extends QpidJmsTestCase { testPeer.expectSaslAnonymousConnect(); // Each connection creates a session for managing temporary destinations etc - testPeer.expectBegin(true); + testPeer.expectBegin(); ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort()); Connection connection = factory.createConnection(); @@ -141,7 +141,7 @@ public class IdleTimeoutIntegrationTest extends QpidJmsTestCase { testPeer.expectSaslAnonymousConnect(); // Each connection creates a session for managing temporary destinations etc - testPeer.expectBegin(true); + testPeer.expectBegin(); ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort()); Connection connection = factory.createConnection(); @@ -170,7 +170,7 @@ public class IdleTimeoutIntegrationTest extends QpidJmsTestCase { testPeer.expectSaslAnonymousConnect(); // Each connection creates a session for managing temporary destinations etc - testPeer.expectBegin(true); + testPeer.expectBegin(); JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort() + "?amqp.idleTimeout=" + configuredTimeout); final JmsConnection connection = (JmsConnection) factory.createConnection(); @@ -205,7 +205,7 @@ public class IdleTimeoutIntegrationTest extends QpidJmsTestCase { testPeer.expectSaslAnonymousConnect(); // Each connection creates a session for managing temporary destinations etc - testPeer.expectBegin(true); + testPeer.expectBegin(); // Start to emit idle frames when the connection is set up, this should stop it timing out testPeer.runAfterLastHandler(new EmptyFrameSender(latch, period, cycles, testPeer)); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f4b70920/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java index 8dd148c..c0a3cbc 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java @@ -59,7 +59,7 @@ public class IntegrationTestFixture { testPeer.expectSaslPlainConnect("guest", "guest", desiredCapabilities, serverCapabilities, serverProperties); // Each connection creates a session for managing temporary destinations etc - testPeer.expectBegin(true); + testPeer.expectBegin(); String scheme = ssl ? "amqps" : "amqp"; final String baseURI = scheme + "://localhost:" + testPeer.getServerPort(); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f4b70920/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java index a31b37a..ed54bf1 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java @@ -64,7 +64,7 @@ public class MapMessageIntegrationTest extends QpidJmsTestCase { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("myQueue"); @@ -151,7 +151,7 @@ public class MapMessageIntegrationTest extends QpidJmsTestCase { public void testSendBasicMapMessage() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); testPeer.expectSenderAttach(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f4b70920/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java index 8206d72..ad7dd36 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java @@ -105,7 +105,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase public void testSendMessageWithApplicationProperties() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); testPeer.expectSenderAttach(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -162,7 +162,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("myQueue"); @@ -228,7 +228,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase Connection connection = testFixture.establishConnecton(testPeer, "?jms.validatePropertyNames=" + !disableValidation); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); String invalidPropName = "invalid-name"; String value = "valueA"; @@ -280,7 +280,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase Connection connection = testFixture.establishConnecton(testPeer, "?jms.validatePropertyNames=" + !disableValidation); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); String invalidPropName = "invalid-name"; String value = "valueA"; @@ -351,7 +351,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); String queueName = "myQueue"; String topicName = "myTopic"; @@ -402,7 +402,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("myQueue"); @@ -591,7 +591,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase ((JmsConnection) connection).setQueuePrefix(destPrefix); } - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -751,7 +751,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase ((JmsConnection) connection).setQueuePrefix(destPrefix); } - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -865,7 +865,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase Connection connection = testFixture.establishConnecton(testPeer, null, null, properties); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -976,7 +976,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -1038,7 +1038,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("myTopic"); @@ -1077,7 +1077,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase public void testSentMessageContainsToTypeAnnotationByte() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); testPeer.expectSenderAttach(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -1115,7 +1115,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); testPeer.expectSenderAttach(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -1161,7 +1161,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("myQueue"); @@ -1203,7 +1203,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("myQueue"); @@ -1246,7 +1246,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("myQueue"); @@ -1279,7 +1279,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("myQueue"); @@ -1314,7 +1314,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("myQueue"); @@ -1374,7 +1374,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("myQueue"); @@ -1441,7 +1441,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("myQueue"); @@ -1543,7 +1543,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase private void sentMessageWithCorrelationIdTestImpl(String stringCorrelationId, Object correlationIdForAmqpMessageClass, boolean appSpecific) throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); testPeer.expectSenderAttach(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -1628,7 +1628,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("myQueue"); @@ -1695,7 +1695,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("myQueue"); @@ -1768,7 +1768,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase public void testSendMessageWithGroupRelatedPropertiesSet() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); testPeer.expectSenderAttach(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f4b70920/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java index b31ad06..6ab18e6 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java @@ -80,7 +80,7 @@ public class ObjectMessageIntegrationTest extends QpidJmsTestCase private void doSendBasicObjectMessageWithSerializedContentTestImpl(String content, boolean setObjectIfNull) throws JMSException, IOException, InterruptedException, Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); testPeer.expectSenderAttach(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -124,7 +124,7 @@ public class ObjectMessageIntegrationTest extends QpidJmsTestCase Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("myQueue"); @@ -170,7 +170,7 @@ public class ObjectMessageIntegrationTest extends QpidJmsTestCase Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("myQueue"); @@ -230,7 +230,7 @@ public class ObjectMessageIntegrationTest extends QpidJmsTestCase public void testSendBasicObjectMessageWithAmqpTypedContent() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); testPeer.expectSenderAttach(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -269,7 +269,7 @@ public class ObjectMessageIntegrationTest extends QpidJmsTestCase Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("myQueue"); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f4b70920/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java index bd25982..5220d63 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java @@ -61,7 +61,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { public void testCloseSender() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); testPeer.expectSenderAttach(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -79,7 +79,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { public void testDefaultDeliveryModeProducesDurableMessages() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); testPeer.expectSenderAttach(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -108,7 +108,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { public void testProducerOverridesMessageDeliveryMode() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); testPeer.expectSenderAttach(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -146,7 +146,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { public void testSendingMessageSetsJMSDestination() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); testPeer.expectSenderAttach(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -181,7 +181,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { public void testSendingMessageSetsJMSTimestamp() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); testPeer.expectSenderAttach(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -225,7 +225,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { public void testSendingMessageWithDisableMessageTimestampHint() throws Exception { try(TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); testPeer.expectSenderAttach(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -262,7 +262,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { public void testSendingMessageSetsJMSExpirationRelatedAbsoluteExpiryAndTtlFields() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); testPeer.expectSenderAttach(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -315,7 +315,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { public void sendingMessageWithJMS_AMQP_TTLSetTestImpl(long jmsTtl, long amqpTtl) throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); testPeer.expectSenderAttach(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -367,7 +367,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { public void testDefaultPriorityProducesMessagesWithoutPriorityField() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); testPeer.expectSenderAttach(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -403,7 +403,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { public void testNonDefaultPriorityProducesMessagesWithPriorityFieldAndSetsJMSPriority() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); testPeer.expectSenderAttach(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -441,7 +441,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { public void testSendingMessageSetsJMSMessageID() throws Exception { try(TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); testPeer.expectSenderAttach(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -487,7 +487,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { public void testSendingMessageWithDisableMessageIDHint() throws Exception { try(TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); testPeer.expectSenderAttach(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -527,7 +527,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a producer, then remotely end it afterwards. @@ -567,7 +567,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { public void testSendWhenLinkCreditIsDelayed() throws Exception { try(TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer, "?amqp.traceFrames=true&amqp.traceBytes=true"); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); String queueName = "myQueue"; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f4b70920/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslIntegrationTest.java index db5d85a..4f9c299 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslIntegrationTest.java @@ -70,7 +70,7 @@ public class SaslIntegrationTest extends QpidJmsTestCase { // Expect an EXTERNAL connection testPeer.expectSaslExternalConnect(); // Each connection creates a session for managing temporary destinations etc - testPeer.expectBegin(true); + testPeer.expectBegin(); ConnectionFactory factory = new JmsConnectionFactory("amqps://localhost:" + testPeer.getServerPort() + connOptions); Connection connection = factory.createConnection(); @@ -95,7 +95,7 @@ public class SaslIntegrationTest extends QpidJmsTestCase { testPeer.expectSaslPlainConnect(user, pass, null, null); // Each connection creates a session for managing temporary destinations etc - testPeer.expectBegin(true); + testPeer.expectBegin(); ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort()); Connection connection = factory.createConnection(user, pass); @@ -116,7 +116,7 @@ public class SaslIntegrationTest extends QpidJmsTestCase { // Expect an ANOYMOUS connection testPeer.expectSaslAnonymousConnect(); // Each connection creates a session for managing temporary destinations etc - testPeer.expectBegin(true); + testPeer.expectBegin(); ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort()); Connection connection = factory.createConnection(); @@ -229,7 +229,7 @@ public class SaslIntegrationTest extends QpidJmsTestCase { // Expect a connection with no SASL layer. testPeer.expectSaslLayerDisabledConnect(); // Each connection creates a session for managing temporary destinations etc - testPeer.expectBegin(true); + testPeer.expectBegin(); ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort() + "?amqp.saslLayer=false"); Connection connection = factory.createConnection(); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f4b70920/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java index a421736..ec6094b 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java @@ -90,7 +90,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { public void testCloseSession() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); assertNotNull("Session should not be null", session); testPeer.expectEnd(); @@ -102,7 +102,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { public void testCreateProducer() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -117,7 +117,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { public void testCreateProducerLinkSupportsAcceptedAndRejectedOutcomes() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -145,7 +145,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -174,7 +174,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); String topicName = "myTopic"; @@ -228,7 +228,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); try { @@ -259,7 +259,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); String dynamicAddress = "myTempQueueAddress"; @@ -280,7 +280,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); String dynamicAddress = "myTempQueueAddress"; @@ -301,7 +301,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); String dynamicAddress = "myTempTopicAddress"; @@ -322,7 +322,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); String dynamicAddress = "myTempTopicAddress"; @@ -360,7 +360,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { private void doCreateConsumerSourceContainsCapabilityTestImpl(Class<? extends Destination> destType) throws JMSException, Exception, IOException { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -421,7 +421,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { private void doCreateProducerTargetContainsCapabilityTestImpl(Class<? extends Destination> destType) throws JMSException, Exception, IOException { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -466,7 +466,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //Expect and accept a link to the anonymous relay node, check it has no type capability @@ -514,7 +514,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); String destName = "myDest"; @@ -577,7 +577,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); String topicName = "myTopic"; @@ -603,7 +603,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { Connection connection = testFixture.establishConnecton(testPeer, false, null, null, null, false); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); String topicName = "myTopic"; @@ -628,7 +628,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); String topicName = "myTopic"; @@ -656,7 +656,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); String topicName = "myTopic"; @@ -713,7 +713,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //Expect and refuse a link to the anonymous relay node @@ -752,7 +752,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); String topicName = "myTopic"; @@ -789,7 +789,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); String topicName = "myTopic"; @@ -848,7 +848,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); @@ -908,7 +908,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { ((JmsConnection) connection).getRedeliveryPolicy().setMaxRedeliveries(1); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("myQueue"); @@ -939,7 +939,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); @@ -995,7 +995,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); @@ -1065,7 +1065,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { ((JmsConnection) connection).getPrefetchPolicy().setAll(messageCount); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); @@ -1139,7 +1139,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { ((JmsConnection) connection).getPrefetchPolicy().setAll(messageCount); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); @@ -1216,7 +1216,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); @@ -1248,7 +1248,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { ((JmsConnection) connection).getPrefetchPolicy().setAll(newPrefetch); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("myQueue"); @@ -1269,7 +1269,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a producer, then remotely end the session afterwards. @@ -1323,7 +1323,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a consumer, then remotely end the session afterwards. @@ -1375,7 +1375,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); // Create a consumer @@ -1402,7 +1402,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -1425,4 +1425,29 @@ public class SessionIntegrationTest extends QpidJmsTestCase { connection.close(); } } + + @Test(timeout = 20000) + public void testSessionHasExpectedDefaultOutgoingWindow() throws Exception { + doSessionHasExpectedOutgoingWindowTestImpl(Integer.MAX_VALUE, null); + } + + @Test(timeout = 20000) + public void testSessionHasExpectedConfiguredOutgoingWindow() throws Exception { + int windowSize = 13579; + doSessionHasExpectedOutgoingWindowTestImpl(windowSize, "?amqp.sessionOutgoingWindow=" + windowSize); + } + + private void doSessionHasExpectedOutgoingWindowTestImpl(int value, String options) throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, options); + + testPeer.expectBegin(equalTo(UnsignedInteger.valueOf(value))); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + assertNotNull("Session should not be null", session); + + testPeer.expectClose(); + connection.close(); + } + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f4b70920/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java index 822d4e9..f3f86de 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java @@ -64,7 +64,7 @@ public class StreamMessageIntegrationTest extends QpidJmsTestCase { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("myQueue"); @@ -147,7 +147,7 @@ public class StreamMessageIntegrationTest extends QpidJmsTestCase { public void testSendBasicMapMessage() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); testPeer.expectSenderAttach(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f4b70920/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java index 20163a5..935e721 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java @@ -57,7 +57,7 @@ public class TextMessageIntegrationTest extends QpidJmsTestCase { public void testSendTextMessage() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); testPeer.expectSenderAttach(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -88,7 +88,7 @@ public class TextMessageIntegrationTest extends QpidJmsTestCase { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("myQueue"); @@ -114,7 +114,7 @@ public class TextMessageIntegrationTest extends QpidJmsTestCase { public void testSendTextMessageWithoutContent() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); - testPeer.expectBegin(true); + testPeer.expectBegin(); testPeer.expectSenderAttach(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -144,7 +144,7 @@ public class TextMessageIntegrationTest extends QpidJmsTestCase { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("myQueue"); @@ -219,7 +219,7 @@ public class TextMessageIntegrationTest extends QpidJmsTestCase { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); - testPeer.expectBegin(true); + testPeer.expectBegin(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("myQueue"); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f4b70920/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java index dea89e9..2a74bd2 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java @@ -157,7 +157,7 @@ public class AmqpProviderTest extends QpidJmsTestCase { provider = new AmqpProvider(peerURI); testPeer.expectSaslPlainConnect(TEST_USERNAME, TEST_PASSWORD, null, null); - testPeer.expectBegin(true); + testPeer.expectBegin(); provider.connect(); testPeer.expectClose(); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f4b70920/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java index 8b3baeb..8ed88e5 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java @@ -61,7 +61,7 @@ public class FailoverRedirectTest extends QpidJmsTestCase { LOG.info("Backup peer is at: {}", redirectURI); redirectedPeer.expectSaslAnonymousConnect(); - redirectedPeer.expectBegin(true); + redirectedPeer.expectBegin(); Map<Symbol, Object> redirectInfo = new HashMap<Symbol, Object>(); redirectInfo.put(OPEN_HOSTNAME, "localhost"); @@ -104,7 +104,7 @@ public class FailoverRedirectTest extends QpidJmsTestCase { LOG.info("Primary is at {}: Backup peer is at: {}", rejectingURI, redirectURI); redirectedPeer.expectSaslAnonymousConnect(); - redirectedPeer.expectBegin(true); + redirectedPeer.expectBegin(); Map<Symbol, Object> redirectInfo = new HashMap<Symbol, Object>(); redirectInfo.put(OPEN_HOSTNAME, "localhost"); @@ -112,7 +112,7 @@ public class FailoverRedirectTest extends QpidJmsTestCase { redirectInfo.put(PORT, redirectedPeer.getServerPort()); rejectingPeer.expectSaslAnonymousConnect(); - rejectingPeer.expectBegin(true); + rejectingPeer.expectBegin(); rejectingPeer.remotelyCloseConnection(true, ConnectionError.REDIRECT, "Server is full, go away", redirectInfo); final JmsConnection connection = establishAnonymousConnecton(rejectingPeer); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f4b70920/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java index be7282c..e72f9d2 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java @@ -601,13 +601,25 @@ public class TestAmqpPeer implements AutoCloseable addHandler(closeMatcher); } - public void expectBegin(boolean expectSessionFlow) + public void expectBegin() + { + expectBegin(notNullValue()); + } + + public void expectBegin(Matcher<?> outgoingWindowMatcher) { final BeginMatcher beginMatcher = new BeginMatcher() .withRemoteChannel(nullValue()) .withNextOutgoingId(equalTo(UnsignedInteger.ONE)) - .withIncomingWindow(notNullValue()) - .withOutgoingWindow(notNullValue()); + .withIncomingWindow(notNullValue()); + if(outgoingWindowMatcher != null) + { + beginMatcher.withOutgoingWindow(notNullValue()); + } + else + { + beginMatcher.withOutgoingWindow(outgoingWindowMatcher); + } // The response will have its remoteChannel field dynamically set based on incoming value final BeginFrame beginResponse = new BeginFrame() @@ -634,11 +646,6 @@ public class TestAmqpPeer implements AutoCloseable beginMatcher.onCompletion(beginResponseSender); addHandler(beginMatcher); - - if(expectSessionFlow) - { - expectSessionFlow(); - } } public void expectEnd() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
