Updated Branches: refs/heads/trunk 0f0c0d676 -> 283cdd050
https://issues.apache.org/jira/browse/AMQ-4914 Up the max frame size to a value of 1mb so that we are restricted by the default in the QPid client which is 32k. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/283cdd05 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/283cdd05 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/283cdd05 Branch: refs/heads/trunk Commit: 283cdd0502d61e5ee339c80de5c0ff27f05c239f Parents: 0f0c0d6 Author: Timothy Bish <[email protected]> Authored: Wed Jan 8 11:27:39 2014 -0500 Committer: Timothy Bish <[email protected]> Committed: Wed Jan 8 11:27:39 2014 -0500 ---------------------------------------------------------------------- .../transport/amqp/AmqpProtocolConverter.java | 10 +++++++ .../activemq/transport/amqp/AmqpWireFormat.java | 27 +++++++++++------- .../transport/amqp/bugs/AMQ4914Test.java | 29 +++++++++++++------- 3 files changed, 46 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/283cdd05/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 1c7c0e2..1af64e3 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -126,6 +126,16 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { public AmqpProtocolConverter(AmqpTransport transport) { this.amqpTransport = transport; + + int maxFrameSize = AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE; + + // AMQ-4914 - Setting the max frame size to large stalls out the QPid client on sends or + // consume due to no session credit. Once fixed we should set this value using + // the configured maxFrameSize on the URI. + //int maxFrameSize = transport.getWireFormat().getMaxFrameSize() > Integer.MAX_VALUE ? + // Integer.MAX_VALUE : (int) transport.getWireFormat().getMaxFrameSize(); + + this.protonTransport.setMaxFrameSize(maxFrameSize); this.protonTransport.bind(this.protonConnection); updateTracer(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/283cdd05/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java index 631011e..4a11374 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java @@ -16,22 +16,26 @@ */ package org.apache.activemq.transport.amqp; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; + import org.apache.activemq.util.ByteArrayInputStream; import org.apache.activemq.util.ByteArrayOutputStream; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.wireformat.WireFormat; import org.fusesource.hawtbuf.Buffer; -import java.io.*; - -/** - */ public class AmqpWireFormat implements WireFormat { + public static final int DEFAULT_MAX_FRAME_SIZE = 1024 * 1024 * 1; private int version = 1; - private long maxFrameSize = 1024*1024*100; + private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE; + @Override public ByteSequence marshal(Object command) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos); @@ -40,27 +44,31 @@ public class AmqpWireFormat implements WireFormat { return baos.toByteSequence(); } + @Override public Object unmarshal(ByteSequence packet) throws IOException { ByteArrayInputStream stream = new ByteArrayInputStream(packet); DataInputStream dis = new DataInputStream(stream); return unmarshal(dis); } + @Override public void marshal(Object command, DataOutput dataOut) throws IOException { Buffer frame = (Buffer) command; frame.writeTo(dataOut); } boolean magicRead = false; + + @Override public Object unmarshal(DataInput dataIn) throws IOException { - if( !magicRead ) { + if (!magicRead) { Buffer magic = new Buffer(8); magic.readFrom(dataIn); magicRead = true; return new AmqpHeader(magic); } else { int size = dataIn.readInt(); - if( size > maxFrameSize) { + if (size > maxFrameSize) { throw new AmqpProtocolException("Frame size exceeded max frame length."); } Buffer frame = new Buffer(size); @@ -71,8 +79,7 @@ public class AmqpWireFormat implements WireFormat { } } - /** - */ + @Override public void setVersion(int version) { this.version = version; } @@ -80,11 +87,11 @@ public class AmqpWireFormat implements WireFormat { /** * @return the version of the wire format */ + @Override public int getVersion() { return this.version; } - public long getMaxFrameSize() { return maxFrameSize; } http://git-wip-us.apache.org/repos/asf/activemq/blob/283cdd05/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4914Test.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4914Test.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4914Test.java index 1dde725..aefae8b 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4914Test.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4914Test.java @@ -16,15 +16,9 @@ */ package org.apache.activemq.transport.amqp.bugs; -import org.apache.activemq.transport.amqp.AmqpTestSupport; -import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; -import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl; -import org.junit.Ignore; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import javax.jms.Connection; import javax.jms.ExceptionListener; @@ -35,7 +29,15 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; -import static org.junit.Assert.*; +import org.apache.activemq.transport.amqp.AmqpTestSupport; +import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; +import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class AMQ4914Test extends AmqpTestSupport { @Rule @@ -67,6 +69,13 @@ public class AMQ4914Test extends AmqpTestSupport { } } + @Test(timeout = 2 * 60 * 1000) + public void testSendFixedSizedMessages() throws JMSException { + doTestSendLargeMessage(65536); + doTestSendLargeMessage(65536 * 2); + doTestSendLargeMessage(65536 * 4); + } + @Ignore("AMQ-4914") @Test(timeout = 2 * 60 * 1000) public void testSendLargeMessages() throws JMSException {
