Repository: activemq Updated Branches: refs/heads/master eece06b32 -> 31834ed1f
https://issues.apache.org/jira/browse/AMQ-5731 Ensure that a ShutdownInfo is propagated to the Broker when the AMQP transport encounters an unrecoverable error. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/31834ed1 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/31834ed1 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/31834ed1 Branch: refs/heads/master Commit: 31834ed1fb77a413a62b7fc59d186d2371a0b728 Parents: eece06b Author: Timothy Bish <[email protected]> Authored: Wed Apr 15 18:30:13 2015 -0400 Committer: Timothy Bish <[email protected]> Committed: Wed Apr 15 18:30:13 2015 -0400 ---------------------------------------------------------------------- .../transport/amqp/protocol/AmqpConnection.java | 4 +- .../transport/amqp/client/AmqpConnection.java | 25 +++++++ .../interop/AmqpCorruptedFrameHandlingTest.java | 69 ++++++++++++++++++++ 3 files changed, 97 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/31834ed1/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java index c977c8f..b8c6997 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java @@ -419,7 +419,7 @@ public class AmqpConnection implements AmqpProtocolConverter { } }); - sendToActiveMQ(new ShutdownInfo(), null); + sendToActiveMQ(new ShutdownInfo()); } } @@ -655,6 +655,8 @@ public class AmqpConnection implements AmqpProtocolConverter { exception.printStackTrace(); LOG.debug("Exception detail", exception); try { + // Must ensure that the broker removes Connection resources. + sendToActiveMQ(new ShutdownInfo()); amqpTransport.stop(); } catch (Throwable e) { LOG.error("Failed to stop AMQP Transport ", e); http://git-wip-us.apache.org/repos/asf/activemq/blob/31834ed1/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java index b4fd661..eafbd1b 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java @@ -234,6 +234,31 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements return session; } + //----- Access to low level IO for specific test cases -------------------// + + public void sendRawBytes(final byte[] rawData) throws Exception { + checkClosed(); + + final ClientFuture request = new ClientFuture(); + + serializer.execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + try { + transport.send(ByteBuffer.wrap(rawData)); + } catch (IOException e) { + fireClientException(e); + } finally { + request.onSuccess(); + } + } + }); + + request.sync(); + } + //----- Configuration accessors ------------------------------------------// /** http://git-wip-us.apache.org/repos/asf/activemq/blob/31834ed1/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorruptedFrameHandlingTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorruptedFrameHandlingTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorruptedFrameHandlingTest.java new file mode 100644 index 0000000..5ee08ca --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorruptedFrameHandlingTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.interop; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Random; + +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.util.Wait; +import org.junit.Test; + +/** + * Test that broker closes connection and allows a new one when the transport + * receives a bad chunk of data after a successful connect. + */ +public class AmqpCorruptedFrameHandlingTest extends AmqpClientTestSupport { + + @Test(timeout = 60000) + public void testCanConnect() throws Exception { + Random random = new Random(); + random.setSeed(System.nanoTime()); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.createConnection(); + + connection.setContainerId("ClientID:" + getTestName()); + connection.connect(); + + assertEquals(1, getProxyToBroker().getCurrentConnectionsCount()); + + // Send frame with valid size prefix, but corrupted payload. + byte[] corruptedFrame = new byte[1024]; + random.nextBytes(corruptedFrame); + corruptedFrame[0] = 0x0; + corruptedFrame[1] = 0x0; + corruptedFrame[2] = 0x4; + corruptedFrame[3] = 0x0; + + connection.sendRawBytes(corruptedFrame); + + assertTrue("Connection should have dropped.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToBroker().getCurrentConnectionsCount() == 0; + } + })); + + connection.close(); + } +}
