Repository: qpid-jms Updated Branches: refs/heads/master d7db08f54 -> 63d56d282
add option to group multiple frames into a single attempted TCP write, use to optionally defer the attach response until the detach is ready too Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/2d617c82 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/2d617c82 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/2d617c82 Branch: refs/heads/master Commit: 2d617c8252c976aa69e2d1bcff08d96c0321da4e Parents: d7db08f Author: Robert Gemmell <[email protected]> Authored: Tue Nov 11 10:41:38 2014 +0000 Committer: Robert Gemmell <[email protected]> Committed: Tue Nov 11 09:57:51 2014 +0000 ---------------------------------------------------------------------- .../jms/integration/SessionIntegrationTest.java | 19 +++++-- .../qpid/jms/test/testpeer/FrameSender.java | 9 +++- .../qpid/jms/test/testpeer/TestAmqpPeer.java | 52 +++++++++++++++++--- 3 files changed, 69 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2d617c82/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java index 9181af9..ef4dcdd 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java @@ -26,6 +26,8 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; +import java.io.IOException; + import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; @@ -158,7 +160,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { targetMatcher.withDynamic(nullValue());//default = false targetMatcher.withDurable(nullValue());//default = none/0 - testPeer.expectSenderAttach(targetMatcher, false); + testPeer.expectSenderAttach(targetMatcher, false, false); //Create an anonymous producer MessageProducer producer = session.createProducer(null); @@ -180,7 +182,16 @@ public class SessionIntegrationTest extends QpidJmsTestCase { } @Test(timeout = 5000) - public void testCreateProducerFailsWhenLinkRefused() throws Exception { + public void testCreateProducerFailsWhenLinkRefusedAndAttachFrameWriteIsNotDeferred() throws Exception { + doCreateProducerFailsWhenLinkRefusedTestImpl(false); + } + + @Test(timeout = 5000) + public void testCreateProducerFailsWhenLinkRefusedAndAttachFrameWriteIsDeferred() throws Exception { + doCreateProducerFailsWhenLinkRefusedTestImpl(true); + } + + private void doCreateProducerFailsWhenLinkRefusedTestImpl(boolean deferAttachFrameWrite) throws JMSException, InterruptedException, Exception, IOException { try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); @@ -197,7 +208,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { targetMatcher.withDynamic(nullValue());//default = false targetMatcher.withDurable(nullValue());//default = none/0 - testPeer.expectSenderAttach(targetMatcher, true); + testPeer.expectSenderAttach(targetMatcher, true, deferAttachFrameWrite); try { //Create a producer, expect it to throw exception due to the link-refusal @@ -230,7 +241,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { targetMatcher.withDynamic(nullValue());//default = false targetMatcher.withDurable(nullValue());//default = none/0 - testPeer.expectSenderAttach(targetMatcher, true); + testPeer.expectSenderAttach(targetMatcher, true, false); //Create an anonymous producer MessageProducer producer = session.createProducer(null); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2d617c82/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameSender.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameSender.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameSender.java index 7099bee..1a139cd 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameSender.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameSender.java @@ -28,6 +28,7 @@ class FrameSender implements AmqpPeerRunnable private final Binary _payload; private ValueProvider _valueProvider; private int _channel; + private boolean _deferWrite = false; FrameSender(TestAmqpPeer testAmqpPeer, FrameType type, int channel, ListDescribedType listDescribedType, Binary payload) { @@ -46,7 +47,7 @@ class FrameSender implements AmqpPeerRunnable _valueProvider.setValues(); } - _testAmqpPeer.sendFrame(_type, _channel, _listDescribedType, _payload); + _testAmqpPeer.sendFrame(_type, _channel, _listDescribedType, _payload, _deferWrite); } public FrameSender setValueProvider(ValueProvider valueProvider) @@ -60,4 +61,10 @@ class FrameSender implements AmqpPeerRunnable _channel = channel; return this; } + + public FrameSender setDeferWrite(boolean deferWrite) + { + _deferWrite = deferWrite; + return this; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2d617c82/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java index e56c32b..8ecbce6 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java @@ -25,6 +25,7 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -100,6 +101,8 @@ public class TestAmqpPeer implements AutoCloseable private volatile int _nextLinkHandle = 100; + private byte[] _deferredBytes; + public TestAmqpPeer(int port) throws IOException { _driverRunnable = new TestAmqpPeerRunner(port, this); @@ -235,7 +238,7 @@ public class TestAmqpPeer implements AutoCloseable _driverRunnable.sendBytes(header); } - public void sendFrame(FrameType type, int channel, DescribedType describedType, Binary payload) + public void sendFrame(FrameType type, int channel, DescribedType describedType, Binary payload, boolean deferWrite) { if(channel < 0) { @@ -244,7 +247,36 @@ public class TestAmqpPeer implements AutoCloseable LOGGER.debug("About to send: {}", describedType); byte[] output = AmqpDataFramer.encodeFrame(type, channel, describedType, payload); - _driverRunnable.sendBytes(output); + + if(deferWrite && _deferredBytes == null) + { + _deferredBytes = output; + } + else if(_deferredBytes != null) + { + int newCapacity = _deferredBytes.length + output.length; + //TODO: check overflow + + byte[] newOutput = new byte[newCapacity]; + System.arraycopy(_deferredBytes, 0, newOutput, 0, _deferredBytes.length); + System.arraycopy(output, 0, newOutput, _deferredBytes.length, output.length); + + _deferredBytes = newOutput; + output = newOutput; + } + + if(deferWrite) + { + LOGGER.debug("Deferring write until pipelined with future frame bytes"); + return; + } + else + { + //clear the deferred bytes to avoid corrupting future sends + _deferredBytes = null; + + _driverRunnable.sendBytes(output); + } } public void expectAnonymousConnect(boolean authorize) @@ -265,7 +297,8 @@ public class TestAmqpPeer implements AutoCloseable TestAmqpPeer.this.sendFrame( FrameType.SASL, 0, new SaslOutcomeFrame().setCode(UnsignedByte.valueOf((byte)0)), - null); + null, + false); _driverRunnable.expectHeader(); } })); @@ -305,7 +338,8 @@ public class TestAmqpPeer implements AutoCloseable TestAmqpPeer.this.sendFrame( FrameType.SASL, 0, new SaslOutcomeFrame().setCode(UnsignedByte.valueOf((byte)0)), - null); + null, + false); _driverRunnable.expectHeader(); } })); @@ -458,10 +492,10 @@ public class TestAmqpPeer implements AutoCloseable public void expectSenderAttach() { - expectSenderAttach(notNullValue(), false); + expectSenderAttach(notNullValue(), false, false); } - public void expectSenderAttach(final Matcher<?> targetMatcher, final boolean refuseLink) + public void expectSenderAttach(final Matcher<?> targetMatcher, final boolean refuseLink, boolean deferAttachResponseWrite) { final AttachMatcher attachMatcher = new AttachMatcher() .withName(notNullValue()) @@ -497,6 +531,12 @@ public class TestAmqpPeer implements AutoCloseable } }); + if(deferAttachResponseWrite) + { + // Defer writing the attach frame until the subsequent frame is also ready + attachResponseSender.setDeferWrite(true); + } + final FlowFrame flowFrame = new FlowFrame().setNextIncomingId(UnsignedInteger.ZERO) .setIncomingWindow(UnsignedInteger.valueOf(2048)) .setNextOutgoingId(UnsignedInteger.ZERO) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
