Repository: activemq
Updated Branches:
  refs/heads/master eece06b32 -> 31834ed1f


https://issues.apache.org/jira/browse/AMQ-5731

Ensure that a ShutdownInfo is propagated to the Broker when the AMQP
transport encounters an unrecoverable error.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/31834ed1
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/31834ed1
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/31834ed1

Branch: refs/heads/master
Commit: 31834ed1fb77a413a62b7fc59d186d2371a0b728
Parents: eece06b
Author: Timothy Bish <[email protected]>
Authored: Wed Apr 15 18:30:13 2015 -0400
Committer: Timothy Bish <[email protected]>
Committed: Wed Apr 15 18:30:13 2015 -0400

----------------------------------------------------------------------
 .../transport/amqp/protocol/AmqpConnection.java |  4 +-
 .../transport/amqp/client/AmqpConnection.java   | 25 +++++++
 .../interop/AmqpCorruptedFrameHandlingTest.java | 69 ++++++++++++++++++++
 3 files changed, 97 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/31834ed1/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
index c977c8f..b8c6997 100644
--- 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
+++ 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
@@ -419,7 +419,7 @@ public class AmqpConnection implements 
AmqpProtocolConverter {
                 }
             });
 
-            sendToActiveMQ(new ShutdownInfo(), null);
+            sendToActiveMQ(new ShutdownInfo());
         }
     }
 
@@ -655,6 +655,8 @@ public class AmqpConnection implements 
AmqpProtocolConverter {
         exception.printStackTrace();
         LOG.debug("Exception detail", exception);
         try {
+            // Must ensure that the broker removes Connection resources.
+            sendToActiveMQ(new ShutdownInfo());
             amqpTransport.stop();
         } catch (Throwable e) {
             LOG.error("Failed to stop AMQP Transport ", e);

http://git-wip-us.apache.org/repos/asf/activemq/blob/31834ed1/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
index b4fd661..eafbd1b 100644
--- 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
@@ -234,6 +234,31 @@ public class AmqpConnection extends 
AmqpAbstractResource<Connection> implements
         return session;
     }
 
+    //----- Access to low level IO for specific test cases 
-------------------//
+
+    public void sendRawBytes(final byte[] rawData) throws Exception {
+        checkClosed();
+
+        final ClientFuture request = new ClientFuture();
+
+        serializer.execute(new Runnable() {
+
+            @Override
+            public void run() {
+                checkClosed();
+                try {
+                    transport.send(ByteBuffer.wrap(rawData));
+                } catch (IOException e) {
+                    fireClientException(e);
+                } finally {
+                    request.onSuccess();
+                }
+            }
+        });
+
+        request.sync();
+    }
+
     //----- Configuration accessors 
------------------------------------------//
 
     /**

http://git-wip-us.apache.org/repos/asf/activemq/blob/31834ed1/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorruptedFrameHandlingTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorruptedFrameHandlingTest.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorruptedFrameHandlingTest.java
new file mode 100644
index 0000000..5ee08ca
--- /dev/null
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorruptedFrameHandlingTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.interop;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Random;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+
+/**
+ * Test that broker closes connection and allows a new one when the transport
+ * receives a bad chunk of data after a successful connect.
+ */
+public class AmqpCorruptedFrameHandlingTest extends AmqpClientTestSupport {
+
+    @Test(timeout = 60000)
+    public void testCanConnect() throws Exception {
+        Random random = new Random();
+        random.setSeed(System.nanoTime());
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.createConnection();
+
+        connection.setContainerId("ClientID:" + getTestName());
+        connection.connect();
+
+        assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
+
+        // Send frame with valid size prefix, but corrupted payload.
+        byte[] corruptedFrame = new byte[1024];
+        random.nextBytes(corruptedFrame);
+        corruptedFrame[0] = 0x0;
+        corruptedFrame[1] = 0x0;
+        corruptedFrame[2] = 0x4;
+        corruptedFrame[3] = 0x0;
+
+        connection.sendRawBytes(corruptedFrame);
+
+        assertTrue("Connection should have dropped.", Wait.waitFor(new 
Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToBroker().getCurrentConnectionsCount() == 0;
+            }
+        }));
+
+        connection.close();
+    }
+}

Reply via email to