Repository: qpid-jms
Updated Branches:
  refs/heads/master d7db08f54 -> 63d56d282


add option to group multiple frames into a single attempted TCP write, use to 
optionally defer the attach response until the detach is ready too


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/2d617c82
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/2d617c82
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/2d617c82

Branch: refs/heads/master
Commit: 2d617c8252c976aa69e2d1bcff08d96c0321da4e
Parents: d7db08f
Author: Robert Gemmell <[email protected]>
Authored: Tue Nov 11 10:41:38 2014 +0000
Committer: Robert Gemmell <[email protected]>
Committed: Tue Nov 11 09:57:51 2014 +0000

----------------------------------------------------------------------
 .../jms/integration/SessionIntegrationTest.java | 19 +++++--
 .../qpid/jms/test/testpeer/FrameSender.java     |  9 +++-
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    | 52 +++++++++++++++++---
 3 files changed, 69 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2d617c82/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 9181af9..ef4dcdd 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -26,6 +26,8 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
+
 import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -158,7 +160,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase 
{
             targetMatcher.withDynamic(nullValue());//default = false
             targetMatcher.withDurable(nullValue());//default = none/0
 
-            testPeer.expectSenderAttach(targetMatcher, false);
+            testPeer.expectSenderAttach(targetMatcher, false, false);
 
             //Create an anonymous producer
             MessageProducer producer = session.createProducer(null);
@@ -180,7 +182,16 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
     }
 
     @Test(timeout = 5000)
-    public void testCreateProducerFailsWhenLinkRefused() throws Exception {
+    public void 
testCreateProducerFailsWhenLinkRefusedAndAttachFrameWriteIsNotDeferred() throws 
Exception {
+        doCreateProducerFailsWhenLinkRefusedTestImpl(false);
+    }
+
+    @Test(timeout = 5000)
+    public void 
testCreateProducerFailsWhenLinkRefusedAndAttachFrameWriteIsDeferred() throws 
Exception {
+        doCreateProducerFailsWhenLinkRefusedTestImpl(true);
+    }
+
+    private void doCreateProducerFailsWhenLinkRefusedTestImpl(boolean 
deferAttachFrameWrite) throws JMSException, InterruptedException, Exception, 
IOException {
         try (TestAmqpPeer testPeer = new 
TestAmqpPeer(IntegrationTestFixture.PORT);) {
             Connection connection = testFixture.establishConnecton(testPeer);
             connection.start();
@@ -197,7 +208,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase 
{
             targetMatcher.withDynamic(nullValue());//default = false
             targetMatcher.withDurable(nullValue());//default = none/0
 
-            testPeer.expectSenderAttach(targetMatcher, true);
+            testPeer.expectSenderAttach(targetMatcher, true, 
deferAttachFrameWrite);
 
             try {
                 //Create a producer, expect it to throw exception due to the 
link-refusal
@@ -230,7 +241,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase 
{
             targetMatcher.withDynamic(nullValue());//default = false
             targetMatcher.withDurable(nullValue());//default = none/0
 
-            testPeer.expectSenderAttach(targetMatcher, true);
+            testPeer.expectSenderAttach(targetMatcher, true, false);
 
             //Create an anonymous producer
             MessageProducer producer = session.createProducer(null);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2d617c82/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameSender.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameSender.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameSender.java
index 7099bee..1a139cd 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameSender.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameSender.java
@@ -28,6 +28,7 @@ class FrameSender implements AmqpPeerRunnable
     private final Binary _payload;
     private ValueProvider _valueProvider;
     private int _channel;
+    private boolean _deferWrite = false;
 
     FrameSender(TestAmqpPeer testAmqpPeer, FrameType type, int channel, 
ListDescribedType listDescribedType, Binary payload)
     {
@@ -46,7 +47,7 @@ class FrameSender implements AmqpPeerRunnable
             _valueProvider.setValues();
         }
 
-        _testAmqpPeer.sendFrame(_type, _channel, _listDescribedType, _payload);
+        _testAmqpPeer.sendFrame(_type, _channel, _listDescribedType, _payload, 
_deferWrite);
     }
 
     public FrameSender setValueProvider(ValueProvider valueProvider)
@@ -60,4 +61,10 @@ class FrameSender implements AmqpPeerRunnable
         _channel = channel;
         return this;
     }
+
+    public FrameSender setDeferWrite(boolean deferWrite)
+    {
+        _deferWrite  = deferWrite;
+        return this;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2d617c82/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index e56c32b..8ecbce6 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -25,6 +25,7 @@ import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
@@ -100,6 +101,8 @@ public class TestAmqpPeer implements AutoCloseable
 
     private volatile int _nextLinkHandle = 100;
 
+    private byte[] _deferredBytes;
+
     public TestAmqpPeer(int port) throws IOException
     {
         _driverRunnable = new TestAmqpPeerRunner(port, this);
@@ -235,7 +238,7 @@ public class TestAmqpPeer implements AutoCloseable
         _driverRunnable.sendBytes(header);
     }
 
-    public void sendFrame(FrameType type, int channel, DescribedType 
describedType, Binary payload)
+    public void sendFrame(FrameType type, int channel, DescribedType 
describedType, Binary payload, boolean deferWrite)
     {
         if(channel < 0)
         {
@@ -244,7 +247,36 @@ public class TestAmqpPeer implements AutoCloseable
 
         LOGGER.debug("About to send: {}", describedType);
         byte[] output = AmqpDataFramer.encodeFrame(type, channel, 
describedType, payload);
-        _driverRunnable.sendBytes(output);
+
+        if(deferWrite && _deferredBytes == null)
+        {
+            _deferredBytes = output;
+        }
+        else if(_deferredBytes != null)
+        {
+            int newCapacity = _deferredBytes.length + output.length;
+            //TODO: check overflow
+
+            byte[] newOutput = new byte[newCapacity];
+            System.arraycopy(_deferredBytes, 0, newOutput, 0, 
_deferredBytes.length);
+            System.arraycopy(output, 0, newOutput, _deferredBytes.length, 
output.length);
+
+            _deferredBytes = newOutput;
+            output = newOutput;
+        }
+
+        if(deferWrite)
+        {
+            LOGGER.debug("Deferring write until pipelined with future frame 
bytes");
+            return;
+        }
+        else
+        {
+            //clear the deferred bytes to avoid corrupting future sends
+            _deferredBytes = null;
+
+            _driverRunnable.sendBytes(output);
+        }
     }
 
     public void expectAnonymousConnect(boolean authorize)
@@ -265,7 +297,8 @@ public class TestAmqpPeer implements AutoCloseable
                     TestAmqpPeer.this.sendFrame(
                             FrameType.SASL, 0,
                             new 
SaslOutcomeFrame().setCode(UnsignedByte.valueOf((byte)0)),
-                            null);
+                            null,
+                            false);
                     _driverRunnable.expectHeader();
                 }
             }));
@@ -305,7 +338,8 @@ public class TestAmqpPeer implements AutoCloseable
                     TestAmqpPeer.this.sendFrame(
                             FrameType.SASL, 0,
                             new 
SaslOutcomeFrame().setCode(UnsignedByte.valueOf((byte)0)),
-                            null);
+                            null,
+                            false);
                     _driverRunnable.expectHeader();
                 }
             }));
@@ -458,10 +492,10 @@ public class TestAmqpPeer implements AutoCloseable
 
     public void expectSenderAttach()
     {
-        expectSenderAttach(notNullValue(), false);
+        expectSenderAttach(notNullValue(), false, false);
     }
 
-    public void expectSenderAttach(final Matcher<?> targetMatcher, final 
boolean refuseLink)
+    public void expectSenderAttach(final Matcher<?> targetMatcher, final 
boolean refuseLink, boolean deferAttachResponseWrite)
     {
         final AttachMatcher attachMatcher = new AttachMatcher()
                 .withName(notNullValue())
@@ -497,6 +531,12 @@ public class TestAmqpPeer implements AutoCloseable
             }
         });
 
+        if(deferAttachResponseWrite)
+        {
+            // Defer writing the attach frame until the subsequent frame is 
also ready
+            attachResponseSender.setDeferWrite(true);
+        }
+
         final FlowFrame flowFrame = new 
FlowFrame().setNextIncomingId(UnsignedInteger.ZERO)
                 .setIncomingWindow(UnsignedInteger.valueOf(2048))
                 .setNextOutgoingId(UnsignedInteger.ZERO)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to