Repository: activemq Updated Branches: refs/heads/master 9445e93ae -> a5c2f3f42
https://issues.apache.org/jira/browse/AMQ-5731 Add some additional checks and handlers for frames with an invalid size prefix and ensure that the connection state is torn down broker side. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a5c2f3f4 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a5c2f3f4 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a5c2f3f4 Branch: refs/heads/master Commit: a5c2f3f42373a7e091ea1a933c23cf7298b23f60 Parents: 9445e93 Author: Timothy Bish <[email protected]> Authored: Thu Apr 16 14:53:22 2015 -0400 Committer: Timothy Bish <[email protected]> Committed: Thu Apr 16 14:53:22 2015 -0400 ---------------------------------------------------------------------- .../activemq/transport/amqp/AmqpWireFormat.java | 2 + .../transport/amqp/protocol/AmqpConnection.java | 9 ++ .../interop/AmqpCorruptedFrameHandlingTest.java | 90 +++++++++++++++++++- 3 files changed, 100 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/a5c2f3f4/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java index 3734cc5..570fd2b 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java @@ -104,6 +104,8 @@ public class AmqpWireFormat implements WireFormat { int size = dataIn.readInt(); if (size > maxFrameSize) { throw new AmqpProtocolException("Frame size exceeded max frame length."); + } else if (size <= 0) { + throw new AmqpProtocolException("Frame size value was invalid: " + size); } Buffer frame = new Buffer(size); frame.bigEndianEditor().writeInt(size); http://git-wip-us.apache.org/repos/asf/activemq/blob/a5c2f3f4/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java index b8c6997..bab16c9 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java @@ -497,6 +497,15 @@ public class AmqpConnection implements AmqpProtocolConverter { public void onAMQPException(IOException error) { closedSocket = true; if (!closing) { + try { + closing = true; + // Attempt to inform the other end that we are going to close + // so that the client doesn't wait around forever. + protonConnection.setCondition(new ErrorCondition(AmqpError.DECODE_ERROR, error.getMessage())); + protonConnection.close(); + pumpProtonToSocket(); + } catch (Exception ignore) { + } amqpTransport.sendToActiveMQ(error); } else { try { http://git-wip-us.apache.org/repos/asf/activemq/blob/a5c2f3f4/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorruptedFrameHandlingTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorruptedFrameHandlingTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorruptedFrameHandlingTest.java index 5ee08ca..3c57ecd 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorruptedFrameHandlingTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorruptedFrameHandlingTest.java @@ -33,8 +33,13 @@ import org.junit.Test; */ public class AmqpCorruptedFrameHandlingTest extends AmqpClientTestSupport { + @Override + protected String getAdditionalConfig() { + return "?transport.wireFormat.maxFrameSize=65535"; + } + @Test(timeout = 60000) - public void testCanConnect() throws Exception { + public void testHandlingCorruptedFramePayload() throws Exception { Random random = new Random(); random.setSeed(System.nanoTime()); @@ -65,5 +70,88 @@ public class AmqpCorruptedFrameHandlingTest extends AmqpClientTestSupport { })); connection.close(); + + // Should be able to recycle the client ID now. + connection = client.createConnection(); + connection.setContainerId("ClientID:" + getTestName()); + connection.connect(); + } + + @Test(timeout = 60000) + public void testHandleFrameWithNegativeSize() throws Exception { + Random random = new Random(); + random.setSeed(System.nanoTime()); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.createConnection(); + + connection.setContainerId("ClientID:" + getTestName()); + connection.connect(); + + assertEquals(1, getProxyToBroker().getCurrentConnectionsCount()); + + // Send frame with valid size prefix, but corrupted payload. + byte[] corruptedFrame = new byte[1024]; + random.nextBytes(corruptedFrame); + corruptedFrame[0] = (byte) 0xFF; + corruptedFrame[1] = 0x0; + corruptedFrame[2] = 0x4; + corruptedFrame[3] = 0x0; + + connection.sendRawBytes(corruptedFrame); + + assertTrue("Connection should have dropped.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToBroker().getCurrentConnectionsCount() == 0; + } + })); + + connection.close(); + + // Should be able to recycle the client ID now. + connection = client.createConnection(); + connection.setContainerId("ClientID:" + getTestName()); + connection.connect(); + } + + @Test(timeout = 60000) + public void testHandleFrameSizeExceedsMaxFrameSize() throws Exception { + Random random = new Random(); + random.setSeed(System.nanoTime()); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.createConnection(); + + connection.setContainerId("ClientID:" + getTestName()); + connection.connect(); + + assertEquals(1, getProxyToBroker().getCurrentConnectionsCount()); + + // Send frame with valid size prefix, but corrupted payload. + byte[] corruptedFrame = new byte[1024]; + random.nextBytes(corruptedFrame); + corruptedFrame[0] = 0x0; + corruptedFrame[1] = 0x7F; + corruptedFrame[2] = 0x7F; + corruptedFrame[3] = 0x7F; + + connection.sendRawBytes(corruptedFrame); + + assertTrue("Connection should have dropped.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToBroker().getCurrentConnectionsCount() == 0; + } + })); + + connection.close(); + + // Should be able to recycle the client ID now. + connection = client.createConnection(); + connection.setContainerId("ClientID:" + getTestName()); + connection.connect(); } }
