Repository: activemq Updated Branches: refs/heads/trunk dc607bbf3 -> 2360fb859
Fix for AMQ-5073, updated AmqpNioSslTransport.java to propery handle frames. Also fixed bugs in amqp test, as seen in AMQ-5062 Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2360fb85 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2360fb85 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2360fb85 Branch: refs/heads/trunk Commit: 2360fb859694bacac1e48092e53a56b388e1d2f0 Parents: dc607bb Author: Kevin Earls <[email protected]> Authored: Fri Feb 28 10:58:15 2014 +0100 Committer: Kevin Earls <[email protected]> Committed: Fri Feb 28 10:58:15 2014 +0100 ---------------------------------------------------------------------- activemq-amqp/pom.xml | 13 ++ .../transport/amqp/AmqpNioSslTransport.java | 149 ++++++++++++++++--- .../transport/amqp/AmqpTestSupport.java | 83 ++++++++++- .../transport/amqp/JMSClientNioPlusSslTest.java | 33 ++++ .../transport/amqp/JMSClientNioTest.java | 2 +- .../transport/amqp/JMSClientSslTest.java | 54 +++++++ .../activemq/transport/amqp/JMSClientTest.java | 19 ++- .../amqp/joram/JoramJmsNioPlusSslTest.java | 1 - 8 files changed, 320 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/2360fb85/activemq-amqp/pom.xml ---------------------------------------------------------------------- diff --git a/activemq-amqp/pom.xml b/activemq-amqp/pom.xml index 56990f4..77d41ec 100644 --- a/activemq-amqp/pom.xml +++ b/activemq-amqp/pom.xml @@ -160,6 +160,19 @@ </execution> </executions> </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <forkCount>1</forkCount> + <reuseForks>false</reuseForks> + <surefire.argLine>-Xmx512M -Djava.awt.headless=true</surefire.argLine> + <runOrder>alphabetical</runOrder> + <forkedProcessTimeoutInSeconds>120</forkedProcessTimeoutInSeconds> + <includes> + <include>**/*Test.*</include> + </includes> + </configuration> + </plugin> </plugins> </build> </profile> http://git-wip-us.apache.org/repos/asf/activemq/blob/2360fb85/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java index ee0dc78..76e6f64 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java @@ -16,21 +16,26 @@ */ package org.apache.activemq.transport.amqp; +import org.apache.activemq.transport.nio.NIOSSLTransport; +import org.apache.activemq.wireformat.WireFormat; +import org.fusesource.hawtbuf.Buffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.SocketFactory; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.IOException; import java.net.Socket; import java.net.URI; import java.net.UnknownHostException; import java.nio.ByteBuffer; -import javax.net.SocketFactory; - -import org.apache.activemq.transport.nio.NIOSSLTransport; -import org.apache.activemq.wireformat.WireFormat; -import org.fusesource.hawtbuf.Buffer; - public class AmqpNioSslTransport extends NIOSSLTransport { - - private final ByteBuffer magic = ByteBuffer.allocate(8); + private DataInputStream amqpHeaderValue = new DataInputStream(new ByteArrayInputStream(new byte[]{'A', 'M', 'Q', 'P'})); + public final Integer AMQP_HEADER_VALUE = amqpHeaderValue.readInt(); + private static final Logger LOG = LoggerFactory.getLogger(AmqpNioSslTransport.class); + private boolean magicConsumed = false; public AmqpNioSslTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { super(wireFormat, socketFactory, remoteLocation, localLocation); @@ -50,27 +55,131 @@ public class AmqpNioSslTransport extends NIOSSLTransport { @Override protected void processCommand(ByteBuffer plain) throws Exception { + // Are we waiting for the next Command or are we building on the current one? The + // frame size is in the first 4 bytes. + if (nextFrameSize == -1) { + // We can get small packets that don't give us enough for the frame size + // so allocate enough for the initial size value and + if (plain.remaining() < 4) { + if (currentBuffer == null) { + currentBuffer = ByteBuffer.allocate(4); + } + + // Go until we fill the integer sized current buffer. + while (currentBuffer.hasRemaining() && plain.hasRemaining()) { + currentBuffer.put(plain.get()); + } + + // Didn't we get enough yet to figure out next frame size. + if (currentBuffer.hasRemaining()) { + return; + } else { + currentBuffer.flip(); + nextFrameSize = currentBuffer.getInt(); + } + } else { + // Either we are completing a previous read of the next frame size or its + // fully contained in plain already. + if (currentBuffer != null) { + // Finish the frame size integer read and get from the current buffer. + while (currentBuffer.hasRemaining()) { + currentBuffer.put(plain.get()); + } - byte[] fill = new byte[plain.remaining()]; - plain.get(fill); + currentBuffer.flip(); + nextFrameSize = currentBuffer.getInt(); + } else { + nextFrameSize = plain.getInt(); + } + } + } - ByteBuffer payload = ByteBuffer.wrap(fill); + // There are three possibilities when we get here. We could have a partial frame, + // a full frame, or more than 1 frame + while (true) { + LOG.debug("Entering while loop with plain.position {} remaining {} ", plain.position(), plain.remaining()); + // handle headers, which start with 'A','M','Q','P' rather than size + if (nextFrameSize == AMQP_HEADER_VALUE) { + nextFrameSize = handleAmqpHeader(plain); + if (nextFrameSize == -1) { + return; + } + } - if (magic.position() != 8) { + validateFrameSize(nextFrameSize); - while (payload.hasRemaining() && magic.position() < 8) { - magic.put(payload.get()); + // now we have the data, let's reallocate and try to fill it, (currentBuffer.putInt() is called + // because we need to put back the 4 bytes we read to determine the size) + currentBuffer = ByteBuffer.allocate(nextFrameSize ); + currentBuffer.putInt(nextFrameSize); + if (currentBuffer.remaining() >= plain.remaining()) { + currentBuffer.put(plain); + } else { + byte[] fill = new byte[currentBuffer.remaining()]; + plain.get(fill); + currentBuffer.put(fill); } - if (!magic.hasRemaining()) { - magic.flip(); - doConsume(new AmqpHeader(new Buffer(magic))); - magic.position(8); + // Either we have enough data for a new command or we have to wait for some more. If hasRemaining is true, + // we have not filled the buffer yet, i.e. we haven't received the full frame. + if (currentBuffer.hasRemaining()) { + return; + } else { + currentBuffer.flip(); + LOG.debug("Calling doConsume with position {} limit {}", currentBuffer.position(), currentBuffer.limit()); + doConsume(AmqpSupport.toBuffer(currentBuffer)); + + // Determine if there are more frames to process + if (plain.hasRemaining()) { + if (plain.remaining() < 4) { + nextFrameSize = 4; + } else { + nextFrameSize = plain.getInt(); + } + } else { + nextFrameSize = -1; + currentBuffer = null; + return; + } } } + } + + private void validateFrameSize(int frameSize) throws IOException { + if (nextFrameSize > AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE) { + throw new IOException("Frame size of " + nextFrameSize + + "larger than max allowed " + AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE); + } + } + + private int handleAmqpHeader(ByteBuffer plain) { + int nextFrameSize; - if (payload.hasRemaining()) { - doConsume(AmqpSupport.toBuffer(payload)); + LOG.debug("Consuming AMQP_HEADER"); + currentBuffer = ByteBuffer.allocate(8); + currentBuffer.putInt(AMQP_HEADER_VALUE); + while (currentBuffer.hasRemaining()) { + currentBuffer.put(plain.get()); } + currentBuffer.flip(); + if (!magicConsumed) { // The first case we see is special and has to be handled differently + doConsume(new AmqpHeader(new Buffer(currentBuffer))); + magicConsumed = true; + } else { + doConsume(AmqpSupport.toBuffer(currentBuffer)); + } + + if (plain.hasRemaining()) { + if (plain.remaining() < 4) { + nextFrameSize = 4; + } else { + nextFrameSize = plain.getInt(); + } + } else { + nextFrameSize = -1; + currentBuffer = null; + } + return nextFrameSize; } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/2360fb85/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java index 23c895e..a44874f 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java @@ -20,6 +20,12 @@ import java.io.File; import java.security.SecureRandom; import java.util.Set; import java.util.Vector; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.jms.Connection; import javax.jms.Destination; @@ -68,7 +74,20 @@ public class AmqpTestSupport { @Before public void setUp() throws Exception { exceptions.clear(); - startBroker(); + if (killHungThreads("setUp")) { + LOG.warn("HUNG THREADS in setUp"); + } + + ExecutorService executor = Executors.newSingleThreadExecutor(); + Future<Boolean> future = executor.submit(new SetUpTask()); + try { + LOG.debug("SetUpTask started."); + Boolean result = future.get(60, TimeUnit.SECONDS); + } catch (TimeoutException e) { + throw new Exception("startBroker timed out"); + } + executor.shutdownNow(); + this.numberOfMessages = 2000; } @@ -130,16 +149,51 @@ public class AmqpTestSupport { } public void stopBroker() throws Exception { + LOG.debug("entering AmqpTestSupport.stopBroker"); if (brokerService != null) { brokerService.stop(); brokerService.waitUntilStopped(); brokerService = null; } + LOG.debug("exiting AmqpTestSupport.stopBroker"); } @After public void tearDown() throws Exception { - stopBroker(); + ExecutorService executor = Executors.newSingleThreadExecutor(); + Future<Boolean> future = executor.submit(new TearDownTask()); + try { + LOG.debug("tearDown started."); + Boolean result = future.get(60, TimeUnit.SECONDS); + } catch (TimeoutException e) { + throw new Exception("startBroker timed out"); + } + executor.shutdownNow(); + + if (killHungThreads("tearDown")) { + LOG.warn("HUNG THREADS in setUp"); + } + } + + private boolean killHungThreads(String stage) throws Exception{ + Thread.sleep(500); + if (Thread.activeCount() == 1) { + return false; + } + LOG.warn("Hung Thread(s) on {} entry threadCount {} ", stage, Thread.activeCount()); + + Thread[] threads = new Thread[Thread.activeCount()]; + Thread.enumerate(threads); + for (int i=0; i < threads.length; i++) { + Thread t = threads[i]; + if (!t.getName().equals("main")) { + LOG.warn("KillHungThreads: Interrupting thread {}", t.getName()); + t.interrupt(); + } + } + + LOG.warn("Hung Thread on {} exit threadCount {} ", stage, Thread.activeCount()); + return true; } public void sendMessages(Connection connection, Destination destination, int count) throws Exception { @@ -191,4 +245,29 @@ public class AmqpTestSupport { .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); return proxy; } + + public class SetUpTask implements Callable<Boolean> { + private String testName; + + @Override + public Boolean call() throws Exception { + LOG.debug("in SetUpTask.call, calling startBroker"); + startBroker(); + + return Boolean.TRUE; + } + } + + public class TearDownTask implements Callable<Boolean> { + private String testName; + + @Override + public Boolean call() throws Exception { + LOG.debug("in TearDownTask.call(), calling stopBroker"); + stopBroker(); + + return Boolean.TRUE; + } + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/2360fb85/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioPlusSslTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioPlusSslTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioPlusSslTest.java new file mode 100644 index 0000000..d39b53c --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioPlusSslTest.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test the JMS client when connected to the NIO+SSL transport. + */ +public class JMSClientNioPlusSslTest extends JMSClientSslTest { + protected static final Logger LOG = LoggerFactory.getLogger(JMSClientNioPlusSslTest.class); + + @Override + protected int getBrokerPort() { + LOG.debug("JMSClientNioPlusSslTest.getBrokerPort returning nioPlusSslPort {}", nioPlusSslPort); + return nioPlusSslPort; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/2360fb85/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioTest.java index 7392482..68f93c3 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioTest.java @@ -29,12 +29,12 @@ import java.io.DataInputStream; /** * Test the JMS client when connected to the NIO transport. */ -@Ignore public class JMSClientNioTest extends JMSClientTest { protected static final Logger LOG = LoggerFactory.getLogger(JMSClientNioTest.class); @Override protected int getBrokerPort() { + LOG.debug("JMSClientNioTest.getBrokerPort returning nioPort {}", nioPort); return nioPort; } } http://git-wip-us.apache.org/repos/asf/activemq/blob/2360fb85/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSslTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSslTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSslTest.java new file mode 100644 index 0000000..2a7445b --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSslTest.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.net.ssl.KeyManager; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import java.security.SecureRandom; + +/** + * Test the JMS client when connected to the SSL transport. + */ +public class JMSClientSslTest extends JMSClientTest { + protected static final Logger LOG = LoggerFactory.getLogger(JMSClientSslTest.class); + + @BeforeClass + public static void beforeClass() throws Exception { + SSLContext ctx = SSLContext.getInstance("TLS"); + ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom()); + SSLContext.setDefault(ctx); + } + + @Override + protected Connection createConnection(String clientId, boolean syncPublish, boolean useSsl) throws JMSException { + LOG.debug("JMSClientSslTest.createConnection called with clientId {} syncPublish {} useSsl {}", clientId, syncPublish, useSsl); + return super.createConnection(clientId, syncPublish, true); + } + + @Override + protected int getBrokerPort() { + LOG.debug("JMSClientSslTest.getBrokerPort returning sslPort {}", sslPort); + return sslPort; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/2360fb85/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java index be5a92b..1c6bc79 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java @@ -57,7 +57,6 @@ import org.objectweb.jtests.jms.framework.TestConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@Ignore public class JMSClientTest extends AmqpTestSupport { protected static final Logger LOG = LoggerFactory.getLogger(JMSClientTest.class); @Rule public TestName name = new TestName(); @@ -353,7 +352,7 @@ public class JMSClientTest extends AmqpTestSupport { } } - @Test(timeout=30000) + @Test(timeout=90000) public void testConsumerReceiveNoWaitThrowsWhenBrokerStops() throws Exception { Connection connection = createConnection(); @@ -377,7 +376,7 @@ public class JMSClientTest extends AmqpTestSupport { try { for (int i = 0; i < 10; ++i) { consumer.receiveNoWait(); - TimeUnit.SECONDS.sleep(1); + TimeUnit.MILLISECONDS.sleep(1000 + (i * 100)); } fail("Should have thrown an IllegalStateException"); } catch (Exception ex) { @@ -385,7 +384,7 @@ public class JMSClientTest extends AmqpTestSupport { } } - @Test(timeout=30000) + @Test(timeout=60000) public void testConsumerReceiveTimedThrowsWhenBrokerStops() throws Exception { Connection connection = createConnection(); @@ -408,7 +407,7 @@ public class JMSClientTest extends AmqpTestSupport { try { for (int i = 0; i < 10; ++i) { - consumer.receive(1000); + consumer.receive(1000 + (i * 100)); } fail("Should have thrown an IllegalStateException"); } catch (Exception ex) { @@ -753,15 +752,15 @@ public class JMSClientTest extends AmqpTestSupport { } private Connection createConnection() throws JMSException { - return createConnection(name.toString(), false); + return createConnection(name.toString(), false, false); } private Connection createConnection(boolean syncPublish) throws JMSException { - return createConnection(name.toString(), syncPublish); + return createConnection(name.toString(), syncPublish, false); } private Connection createConnection(String clientId) throws JMSException { - return createConnection(clientId, false); + return createConnection(clientId, false, false); } /** @@ -773,11 +772,11 @@ public class JMSClientTest extends AmqpTestSupport { return port; } - private Connection createConnection(String clientId, boolean syncPublish) throws JMSException { + protected Connection createConnection(String clientId, boolean syncPublish, boolean useSsl) throws JMSException { int brokerPort = getBrokerPort(); LOG.debug("Creating connection on port {}", brokerPort); - final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", brokerPort, "admin", "password"); + final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", brokerPort, "admin", "password", null, useSsl); factory.setSyncPublish(syncPublish); factory.setTopicPrefix("topic://"); http://git-wip-us.apache.org/repos/asf/activemq/blob/2360fb85/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsNioPlusSslTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsNioPlusSslTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsNioPlusSslTest.java index d1b63d8..168b077 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsNioPlusSslTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsNioPlusSslTest.java @@ -48,7 +48,6 @@ import org.objectweb.jtests.jms.conform.topic.TemporaryTopicTest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@Ignore @RunWith(Suite.class) @Suite.SuiteClasses({ // TopicSessionTest.class, // Hangs, see https://issues.apache.org/jira/browse/PROTON-154
