Repository: qpid-jms Updated Branches: refs/heads/master 43705abfa -> 8f876f926
QPIDJMS-382: limit outgoing transfers to at most the connections configured maxFrameSize Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/8f876f92 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/8f876f92 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/8f876f92 Branch: refs/heads/master Commit: 8f876f9260c131c00d68181acf53f1110fca802b Parents: 43705ab Author: Robbie Gemmell <[email protected]> Authored: Mon Apr 23 19:32:26 2018 +0100 Committer: Robbie Gemmell <[email protected]> Committed: Mon Apr 23 19:32:26 2018 +0100 ---------------------------------------------------------------------- .../qpid/jms/provider/amqp/AmqpProvider.java | 1 + .../integration/ConnectionIntegrationTest.java | 57 ++++++++++++++++++++ ...ractFrameFieldAndPayloadMatchingHandler.java | 14 ++++- .../qpid/jms/test/testpeer/FrameHandler.java | 2 +- .../FrameWithNoPayloadMatchingHandler.java | 2 +- .../FrameWithPayloadMatchingHandler.java | 11 +++- .../qpid/jms/test/testpeer/TestAmqpPeer.java | 21 ++++++-- .../qpid/jms/test/testpeer/TestFrameParser.java | 2 +- .../test/testpeer/matchers/TransferMatcher.java | 6 ++- .../TransferPayloadCompositeMatcher.java | 32 +++++++++++ qpid-jms-docs/Configuration.md | 2 +- 11 files changed, 138 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8f876f92/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index 176a3a0..4d0a4bc 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -189,6 +189,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP if (getMaxFrameSize() > 0) { protonTransport.setMaxFrameSize(getMaxFrameSize()); + protonTransport.setOutboundFrameSizeLimit(getMaxFrameSize()); } protonTransport.setChannelMax(getChannelMax()); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8f876f92/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java index 3e12fd9..e560874 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java @@ -42,11 +42,13 @@ import java.io.IOException; import java.net.URI; import java.util.HashMap; import java.util.Map; +import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.ExceptionListener; @@ -377,6 +379,61 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase { } @Test(timeout = 20000) + public void testMaxFrameSizeInfluencesOutgoingFrameSize() throws Exception { + doMaxFrameSizeInfluencesOutgoingFrameSizeTestImpl(1000, 10001, 11); + doMaxFrameSizeInfluencesOutgoingFrameSizeTestImpl(1500, 6001, 5); + } + + private void doMaxFrameSizeInfluencesOutgoingFrameSizeTestImpl(int frameSize, int bytesPayloadSize, int numFrames) throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + testPeer.expectSaslLayerDisabledConnect(equalTo(UnsignedInteger.valueOf(frameSize))); + // Each connection creates a session for managing temporary destinations etc + testPeer.expectBegin(); + + String uri = "amqp://localhost:" + testPeer.getServerPort() + "?amqp.saslLayer=false&amqp.maxFrameSize=" + frameSize; + ConnectionFactory factory = new JmsConnectionFactory(uri); + Connection connection = factory.createConnection(); + + testPeer.expectBegin(); + testPeer.expectSenderAttach(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + MessageProducer producer = session.createProducer(queue); + + // Expect n-1 transfers of maxFrameSize + for (int i = 1; i < numFrames; i++) { + testPeer.expectTransfer(frameSize); + } + // Plus one more of unknown size (framing overhead). + testPeer.expectTransfer(0); + + // Send the message + byte[] orig = createBytePyload(bytesPayloadSize); + BytesMessage message = session.createBytesMessage(); + message.writeBytes(orig); + + producer.send(message); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(3000); + } + } + + private static byte[] createBytePyload(int sizeInBytes) { + Random rand = new Random(System.currentTimeMillis()); + + byte[] payload = new byte[sizeInBytes]; + for (int i = 0; i < sizeInBytes; i++) { + payload[i] = (byte) (64 + 1 + rand.nextInt(9)); + } + + return payload; + } + + @Test(timeout = 20000) public void testAmqpHostnameSetByDefault() throws Exception { doAmqpHostnameTestImpl("localhost", false, equalTo("localhost")); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8f876f92/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AbstractFrameFieldAndPayloadMatchingHandler.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AbstractFrameFieldAndPayloadMatchingHandler.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AbstractFrameFieldAndPayloadMatchingHandler.java index 889cd58..101d773 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AbstractFrameFieldAndPayloadMatchingHandler.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AbstractFrameFieldAndPayloadMatchingHandler.java @@ -42,21 +42,25 @@ public abstract class AbstractFrameFieldAndPayloadMatchingHandler extends Abstra private AmqpPeerRunnable _onCompletion; + private int _expectedFrameSize; + protected AbstractFrameFieldAndPayloadMatchingHandler(FrameType frameType, int channel, + int frameSize, UnsignedLong numericDescriptor, Symbol symbolicDescriptor) { super(numericDescriptor, symbolicDescriptor); _frameType = frameType; _expectedChannel = channel; + _expectedFrameSize = frameSize; } protected abstract void verifyPayload(Binary payload) throws AssertionError; @SuppressWarnings("unchecked") @Override - public final void frame(int type, int ch, DescribedType dt, Binary payload, TestAmqpPeer peer) + public final void frame(int type, int ch, int frameSize, DescribedType dt, Binary payload, TestAmqpPeer peer) { if(type == _frameType.ordinal() && (_expectedChannel == ANY_CHANNEL || _expectedChannel == ch) @@ -85,6 +89,14 @@ public abstract class AbstractFrameFieldAndPayloadMatchingHandler extends Abstra peer.assertionFailed(ae); } + if(_expectedFrameSize != 0 && _expectedFrameSize != frameSize) { + throw new IllegalArgumentException(String.format( + "Frame size was not as expected. Expected: " + + "type=%s, channel=%s, size=%s, descriptor=%s/%s but got: " + + "type=%s, channel=%s, size=%s", + _frameType.ordinal(), expectedChannelString(), _expectedFrameSize, getNumericDescriptor(), getSymbolicDescriptor(), type, ch, frameSize)); + } + if(_onCompletion != null) { _onCompletion.run(); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8f876f92/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameHandler.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameHandler.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameHandler.java index c188dc2..79f9b95 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameHandler.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameHandler.java @@ -23,5 +23,5 @@ import org.apache.qpid.proton.amqp.DescribedType; interface FrameHandler extends Handler { - void frame(int type, int channel, DescribedType describedType, Binary payload, TestAmqpPeer peer); + void frame(int type, int channel, int frameBodySize, DescribedType describedType, Binary payload, TestAmqpPeer peer); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8f876f92/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameWithNoPayloadMatchingHandler.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameWithNoPayloadMatchingHandler.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameWithNoPayloadMatchingHandler.java index 1d2feb1..5070fc1 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameWithNoPayloadMatchingHandler.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameWithNoPayloadMatchingHandler.java @@ -30,7 +30,7 @@ public class FrameWithNoPayloadMatchingHandler extends AbstractFrameFieldAndPayl UnsignedLong numericDescriptor, Symbol symbolicDescriptor) { - super(frameType, channel, numericDescriptor, symbolicDescriptor); + super(frameType, channel, 0, numericDescriptor, symbolicDescriptor); } @Override http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8f876f92/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameWithPayloadMatchingHandler.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameWithPayloadMatchingHandler.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameWithPayloadMatchingHandler.java index fcfbc33..16989ba 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameWithPayloadMatchingHandler.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameWithPayloadMatchingHandler.java @@ -35,7 +35,16 @@ public class FrameWithPayloadMatchingHandler extends AbstractFrameFieldAndPayloa UnsignedLong numericDescriptor, Symbol symbolicDescriptor) { - super(frameType, channel, numericDescriptor, symbolicDescriptor); + super(frameType, channel, 0, numericDescriptor, symbolicDescriptor); + } + + protected FrameWithPayloadMatchingHandler(FrameType frameType, + int channel, + int frameSize, + UnsignedLong numericDescriptor, + Symbol symbolicDescriptor) + { + super(frameType, channel, frameSize, numericDescriptor, symbolicDescriptor); } public void setPayloadMatcher(Matcher<Binary> payloadMatcher) http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8f876f92/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java index 8eae69b..ed28c82 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java @@ -294,7 +294,7 @@ public class TestAmqpPeer implements AutoCloseable } } - void receiveFrame(int type, int channel, DescribedType describedType, Binary payload) + void receiveFrame(int type, int channel, int frameSize, DescribedType describedType, Binary payload) { Handler handler = getFirstHandler(); if(handler == null) @@ -307,7 +307,7 @@ public class TestAmqpPeer implements AutoCloseable if(handler instanceof FrameHandler) { - ((FrameHandler)handler).frame(type, channel, describedType, payload, this); + ((FrameHandler)handler).frame(type, channel, frameSize, describedType, payload, this); removeFirstHandler(); } else @@ -2063,16 +2063,27 @@ public class TestAmqpPeer implements AutoCloseable expectTransfer(expectedPayloadMatcher, nullValue(), false, true, new Accepted(), true); } + public void expectTransfer(int frameSize) + { + expectTransfer(null, nullValue(), false, true, new Accepted(), true, frameSize); + } + public void expectTransfer(Matcher<Binary> expectedPayloadMatcher, Matcher<?> stateMatcher, boolean settled, ListDescribedType responseState, boolean responseSettled) { expectTransfer(expectedPayloadMatcher, stateMatcher, settled, true, responseState, responseSettled); } - //TODO: fix responseState to only admit applicable types. public void expectTransfer(Matcher<Binary> expectedPayloadMatcher, Matcher<?> stateMatcher, boolean settled, boolean sendResponseDisposition, ListDescribedType responseState, boolean responseSettled) { + expectTransfer(expectedPayloadMatcher, stateMatcher, settled, sendResponseDisposition, responseState, responseSettled, 0); + } + + //TODO: fix responseState to only admit applicable types. + public void expectTransfer(Matcher<Binary> expectedPayloadMatcher, Matcher<?> stateMatcher, boolean settled, + boolean sendResponseDisposition, ListDescribedType responseState, boolean responseSettled, int frameSize) +{ Matcher<Boolean> settledMatcher = null; if(settled) { @@ -2083,7 +2094,7 @@ public class TestAmqpPeer implements AutoCloseable settledMatcher = Matchers.anyOf(equalTo(false), nullValue()); } - final TransferMatcher transferMatcher = new TransferMatcher(); + final TransferMatcher transferMatcher = new TransferMatcher(frameSize); transferMatcher.setPayloadMatcher(expectedPayloadMatcher); transferMatcher.withSettled(settledMatcher); transferMatcher.withState(stateMatcher); @@ -2121,7 +2132,7 @@ public class TestAmqpPeer implements AutoCloseable { Matcher<Boolean> settledMatcher = Matchers.anyOf(equalTo(false), nullValue()); - final TransferMatcher transferMatcher = new TransferMatcher(); + final TransferMatcher transferMatcher = new TransferMatcher(0); transferMatcher.setPayloadMatcher(expectedPayloadMatcher); transferMatcher.withSettled(settledMatcher); transferMatcher.withState(nullValue()); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8f876f92/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestFrameParser.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestFrameParser.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestFrameParser.java index 63799c7..7b9770a 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestFrameParser.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestFrameParser.java @@ -329,7 +329,7 @@ class TestFrameParser payload = null; } - _peer.receiveFrame(type, channel, describedType, payload); + _peer.receiveFrame(type, channel, size, describedType, payload); } else { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8f876f92/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/TransferMatcher.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/TransferMatcher.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/TransferMatcher.java index 17e3eb3..19efcd7 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/TransferMatcher.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/TransferMatcher.java @@ -48,10 +48,14 @@ public class TransferMatcher extends FrameWithPayloadMatchingHandler BATCHABLE, } - public TransferMatcher() + /** + * @param frameSize the size to check, or 0 if not to check the size. + */ + public TransferMatcher(int frameSize) { super(FrameType.AMQP, ANY_CHANNEL, + frameSize, UnsignedLong.valueOf(0x0000000000000014L), Symbol.valueOf("amqp:transfer:list")); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8f876f92/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/TransferPayloadCompositeMatcher.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/TransferPayloadCompositeMatcher.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/TransferPayloadCompositeMatcher.java index a49b8a4..458e103 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/TransferPayloadCompositeMatcher.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/TransferPayloadCompositeMatcher.java @@ -21,6 +21,8 @@ package org.apache.qpid.jms.test.testpeer.matchers.sections; +import static org.hamcrest.MatcherAssert.assertThat; + import org.apache.qpid.proton.amqp.Binary; import org.hamcrest.Description; import org.hamcrest.Matcher; @@ -44,6 +46,8 @@ public class TransferPayloadCompositeMatcher extends TypeSafeMatcher<Binary> private String _msgContentMatcherFailureDescription; private ApplicationPropertiesSectionMatcher _appPropsMatcher; private String _appPropsMatcherFailureDescription; + private Matcher<Integer> _payloadLengthMatcher; + private String _payloadLenthMatcherFailureDescription; public TransferPayloadCompositeMatcher() { @@ -55,6 +59,21 @@ public class TransferPayloadCompositeMatcher extends TypeSafeMatcher<Binary> int origLength = receivedBinary.getLength(); int bytesConsumed = 0; + // Length Matcher + if(_payloadLengthMatcher != null) + { + try + { + assertThat("Payload length should match", origLength, _payloadLengthMatcher); + } + catch(Throwable t) + { + _payloadLenthMatcherFailureDescription = "\nPayload Lenfth Matcher generated throwable: " + t; + + return false; + } + } + //MessageHeader Section if(_msgHeadersMatcher != null) { @@ -156,6 +175,14 @@ public class TransferPayloadCompositeMatcher extends TypeSafeMatcher<Binary> { mismatchDescription.appendText("\nActual encoded form of the full Transfer frame payload: ").appendValue(item); + //Payload Length + if(_payloadLenthMatcherFailureDescription != null) + { + mismatchDescription.appendText("\nPayloadLengthMatcherFailed!"); + mismatchDescription.appendText(_payloadLenthMatcherFailureDescription); + return; + } + //MessageHeaders Section if(_msgHeaderMatcherFailureDescription != null) { @@ -221,4 +248,9 @@ public class TransferPayloadCompositeMatcher extends TypeSafeMatcher<Binary> { _msgContentMatcher = msgContentMatcher; } + + public void setPayloadLengthMatcher(Matcher<Integer> payloadLengthMatcher) + { + _payloadLengthMatcher = payloadLengthMatcher; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8f876f92/qpid-jms-docs/Configuration.md ---------------------------------------------------------------------- diff --git a/qpid-jms-docs/Configuration.md b/qpid-jms-docs/Configuration.md index 40210cf..9d26eb4 100644 --- a/qpid-jms-docs/Configuration.md +++ b/qpid-jms-docs/Configuration.md @@ -201,7 +201,7 @@ These options apply to the behaviour of certain AMQP functionality. + **amqp.vhost** The vhost to connect to. Used to populate the Sasl and Open hostname fields. Default is the main hostname from the Connection URI. + **amqp.saslLayer** Controls whether connections should use a SASL layer or not. Default is true. + **amqp.saslMechanisms** Which SASL mechanism(s) the client should allow selection of, if offered by the server and usable with the configured credentials. Comma separated if specifying more than 1 mechanism. The clients supported mechanisms are currently EXTERNAL, SCRAM-SHA-256, SCRAM-SHA-1, CRAM-MD5, PLAIN, XOAUTH2, ANONYMOUS, and GSSAPI for Kerberos. Default is to allow selection from all mechanisms except GSSAPI, which must be specified here to enable. -+ **amqp.maxFrameSize** The max-frame-size value in bytes that is advertised to the peer. Default is 1048576. ++ **amqp.maxFrameSize** The connection max-frame-size value in bytes. Default is 1048576. + **amqp.drainTimeout** The time in milliseconds that the client will wait for a response from the remote when a consumer drain request is made. If no response is seen in the allotted timeout period the link will be considered failed and the associated consumer will be closed. Default is 60000. + **amqp.allowNonSecureRedirects** Controls whether an AMQP connection will allow for a redirect to an alternative host over a connection that is not secure when the existing connection is secure, e.g. redirecting an SSL connection to a raw TCP connection. This value defaults to false. --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
