Repository: qpid-proton-j
Updated Branches:
  refs/heads/master 84b3ce477 -> e5a7dcade


PROTON-1828: add ability limit outgoing frame sizes


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/commit/e5a7dcad
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/tree/e5a7dcad
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/diff/e5a7dcad

Branch: refs/heads/master
Commit: e5a7dcade2996b2b68967949ddf1377f954bf579
Parents: 84b3ce4
Author: Robbie Gemmell <rob...@apache.org>
Authored: Fri Apr 13 16:03:27 2018 +0100
Committer: Robbie Gemmell <rob...@apache.org>
Committed: Fri Apr 13 16:03:27 2018 +0100

----------------------------------------------------------------------
 .../apache/qpid/proton/engine/Transport.java    |  13 ++
 .../qpid/proton/engine/impl/TransportImpl.java  |  20 ++-
 .../proton/engine/impl/TransportImplTest.java   | 130 ++++++++++++++++++-
 3 files changed, 161 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/e5a7dcad/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java
index 35b2d50..f8de042 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java
@@ -308,4 +308,17 @@ public interface Transport extends Endpoint
     void setEmitFlowEventOnSend(boolean emitFlowEventOnSend);
 
     boolean isEmitFlowEventOnSend();
+
+    /**
+     * Set an upper limit on the size of outgoing frames that will be sent
+     * to the peer. Allows constraining the transport not to emit Transfer
+     * frames over a given size even when the peers max frame size allows it.
+     *
+     * Must be set before receiving the peers Open frame to have effect.
+     *
+     * @param size the size limit to apply
+     */
+    void setOutboundFrameSizeLimit(int size);
+
+    int getOutboundFrameSizeLimit();
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/e5a7dcad/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index 1d0103e..afadb5f 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -104,6 +104,7 @@ public class TransportImpl extends EndpointImpl
 
     private int _maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
     private int _remoteMaxFrameSize = MIN_MAX_FRAME_SIZE;
+    private int _outboundFrameSizeLimit = 0;
     private int _channelMax       = CHANNEL_MAX_LIMIT;
     private int _remoteChannelMax = CHANNEL_MAX_LIMIT;
 
@@ -1105,12 +1106,19 @@ public class TransportImpl extends EndpointImpl
             _open = open;
         }
 
+        int effectiveMaxFrameSize = _remoteMaxFrameSize;
         if(open.getMaxFrameSize().longValue() > 0)
         {
             _remoteMaxFrameSize = (int) open.getMaxFrameSize().longValue();
-            _frameWriter.setMaxFrameSize(_remoteMaxFrameSize);
+            effectiveMaxFrameSize = (int) 
Math.min(open.getMaxFrameSize().longValue(), Integer.MAX_VALUE);
         }
 
+        if(_outboundFrameSizeLimit > 0) {
+            effectiveMaxFrameSize = (int) 
Math.min(open.getMaxFrameSize().longValue(), _outboundFrameSizeLimit);
+        }
+
+        _frameWriter.setMaxFrameSize(effectiveMaxFrameSize);
+
         if (open.getChannelMax().longValue() > 0)
         {
             _remoteChannelMax = (int) open.getChannelMax().longValue();
@@ -1779,4 +1787,14 @@ public class TransportImpl extends EndpointImpl
             _additionalTransportLayers.add(layer);
         }
     }
+
+    @Override
+    public void setOutboundFrameSizeLimit(int limit) {
+        _outboundFrameSizeLimit = limit;
+    }
+
+    @Override
+    public int getOutboundFrameSizeLimit() {
+        return _outboundFrameSizeLimit;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/e5a7dcad/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
 
b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index ead411f..50c04fd 100644
--- 
a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ 
b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -34,6 +34,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.LinkedList;
+import java.util.Random;
 
 import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.amqp.Binary;
@@ -78,7 +79,7 @@ public class TransportImplTest
     private static final TransportFrame TRANSPORT_FRAME_BEGIN = new 
TransportFrame(CHANNEL_ID, new Begin(), null);
     private static final TransportFrame TRANSPORT_FRAME_OPEN = new 
TransportFrame(CHANNEL_ID, new Open(), null);
 
-    private static final int BUFFER_SIZE = 4096;
+    private static final int BUFFER_SIZE = 8 * 1024;
 
     @Rule
     public ExpectedException _expectedException = ExpectedException.none();
@@ -2503,6 +2504,122 @@ public class TransportImplTest
         }
     }
 
+    @Test
+    public void testMaxFrameSizeOfPeerHasEffect()
+    {
+        doMaxFrameSizeTestImpl(0, 0, 5700, 1);
+        doMaxFrameSizeTestImpl(1024, 0, 5700, 6);
+    }
+
+    @Test
+    public void testMaxFrameSizeOutgoingFrameSizeLimitHasEffect()
+    {
+        doMaxFrameSizeTestImpl(0, 512, 5700, 12);
+        doMaxFrameSizeTestImpl(1024, 512, 5700, 12);
+        doMaxFrameSizeTestImpl(1024, 2048, 5700, 6);
+    }
+
+    void doMaxFrameSizeTestImpl(int remoteMaxFrameSize, int 
outboundFrameSizeLimit, int contentLength, int expectedNumFrames)
+    {
+        MockTransportImpl transport = new MockTransportImpl();
+        transport.setEmitFlowEventOnSend(false);
+
+        // If we have been given an outboundFrameSizeLimit, configure it
+        if(outboundFrameSizeLimit != 0) {
+            transport.setOutboundFrameSizeLimit(outboundFrameSizeLimit);
+        }
+
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName = "mySender";
+        Sender sender = session.sender(linkName);
+        sender.open();
+
+        String messageContent = createLargeContent(contentLength);
+        sendMessage(sender, "tag1", messageContent);
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 0, transport.writes.size());
+
+        // Now open the connection, expect the Open and Begin frames but
+        // nothing else as we haven't opened the receiver itself yet.
+        connection.open();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 3, transport.writes.size());
+
+        assertTrue("Unexpected frame type", transport.writes.get(0) instanceof 
Open);
+        assertTrue("Unexpected frame type", transport.writes.get(1) instanceof 
Begin);
+        assertTrue("Unexpected frame type", transport.writes.get(2) instanceof 
Attach);
+
+        // Send the necessary responses to open/begin/attach then give sender 
credit
+        Open open = new Open();
+        if(remoteMaxFrameSize != 0) {
+            open.setMaxFrameSize(UnsignedInteger.valueOf(remoteMaxFrameSize));
+        }
+        transport.handleFrame(new TransportFrame(0, open, null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach = new Attach();
+        attach.setHandle(UnsignedInteger.ZERO);
+        attach.setRole(Role.RECEIVER);
+        attach.setName(linkName);
+        attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach, null));
+
+        Flow flow = new Flow();
+        flow.setHandle(UnsignedInteger.ZERO);
+        flow.setDeliveryCount(UnsignedInteger.ZERO);
+        flow.setNextIncomingId(UnsignedInteger.ONE);
+        flow.setNextOutgoingId(UnsignedInteger.ZERO);
+        flow.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        flow.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        flow.setLinkCredit(UnsignedInteger.valueOf(10));
+
+        transport.handleFrame(new TransportFrame(0, flow, null));
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 3, transport.writes.size());
+
+        // Now pump the transport again and expect transfers for the message
+        pumpMockTransport(transport);
+
+        // This calc isn't entirely precise, there is some added 
performative/frame overhead not
+        // accounted for...but values are chosen to work, and verified here.
+        final int frameCount;
+        if(remoteMaxFrameSize == 0 && outboundFrameSizeLimit == 0) {
+            frameCount = 1;
+        } else if(remoteMaxFrameSize == 0 && outboundFrameSizeLimit != 0) {
+            frameCount = (int) Math.ceil((double)contentLength / (double) 
outboundFrameSizeLimit);
+        } else {
+            int effectiveMaxFrameSize;
+            if(outboundFrameSizeLimit != 0) {
+                effectiveMaxFrameSize = Math.min(outboundFrameSizeLimit, 
remoteMaxFrameSize);
+            } else {
+                effectiveMaxFrameSize = remoteMaxFrameSize;
+            }
+
+            frameCount = (int) Math.ceil((double)contentLength / (double) 
effectiveMaxFrameSize);
+        }
+
+        assertEquals("Unexpected number of frames calculated", 
expectedNumFrames, frameCount);
+
+        final int start = 3;
+        final int totalExpected = start + frameCount;
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), totalExpected, transport.writes.size());
+        for(int i = start; i < totalExpected; i++) {
+            assertTrue("Unexpected frame type", transport.writes.get(i) 
instanceof Transfer);
+        }
+    }
+
     private void processInput(MockTransportImpl transport, ByteBuffer data) {
         while (data.remaining() > 0)
         {
@@ -2515,4 +2632,15 @@ public class TransportImplTest
         }
     }
 
+
+    private static String createLargeContent(int length) {
+        Random rand = new Random(System.currentTimeMillis());
+
+        byte[] payload = new byte[length];
+        for (int i = 0; i < length; i++) {
+            payload[i] = (byte) (64 + 1 + rand.nextInt(9));
+        }
+
+        return new String(payload, StandardCharsets.UTF_8);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to