Repository: activemq Updated Branches: refs/heads/trunk 570dbb437 -> 87420cc45
Fix for AMQ-5093. amqp+nio and amqp+nio+ssl were failing on large messages Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/87420cc4 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/87420cc4 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/87420cc4 Branch: refs/heads/trunk Commit: 87420cc455f6e02d5249bfe29b105f4e1e345969 Parents: 570dbb4 Author: Kevin Earls <[email protected]> Authored: Tue Mar 11 12:00:16 2014 +0100 Committer: Kevin Earls <[email protected]> Committed: Tue Mar 11 12:00:16 2014 +0100 ---------------------------------------------------------------------- .../transport/amqp/AmqpNioSslTransport.java | 137 +-------------- .../transport/amqp/AmqpNioTransport.java | 64 ++----- .../transport/amqp/AmqpNioTransportHelper.java | 175 +++++++++++++++++++ .../activemq/transport/amqp/JMSClientTest.java | 36 ++++ .../transport/amqp/joram/JoramJmsNioTest.java | 2 + 5 files changed, 229 insertions(+), 185 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/87420cc4/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java index 76e6f64..c569f05 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java @@ -18,13 +18,8 @@ package org.apache.activemq.transport.amqp; import org.apache.activemq.transport.nio.NIOSSLTransport; import org.apache.activemq.wireformat.WireFormat; -import org.fusesource.hawtbuf.Buffer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.net.SocketFactory; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; import java.io.IOException; import java.net.Socket; import java.net.URI; @@ -32,10 +27,7 @@ import java.net.UnknownHostException; import java.nio.ByteBuffer; public class AmqpNioSslTransport extends NIOSSLTransport { - private DataInputStream amqpHeaderValue = new DataInputStream(new ByteArrayInputStream(new byte[]{'A', 'M', 'Q', 'P'})); - public final Integer AMQP_HEADER_VALUE = amqpHeaderValue.readInt(); - private static final Logger LOG = LoggerFactory.getLogger(AmqpNioSslTransport.class); - private boolean magicConsumed = false; + private AmqpNioTransportHelper amqpNioTransportHelper = new AmqpNioTransportHelper(this); public AmqpNioSslTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { super(wireFormat, socketFactory, remoteLocation, localLocation); @@ -55,131 +47,6 @@ public class AmqpNioSslTransport extends NIOSSLTransport { @Override protected void processCommand(ByteBuffer plain) throws Exception { - // Are we waiting for the next Command or are we building on the current one? The - // frame size is in the first 4 bytes. - if (nextFrameSize == -1) { - // We can get small packets that don't give us enough for the frame size - // so allocate enough for the initial size value and - if (plain.remaining() < 4) { - if (currentBuffer == null) { - currentBuffer = ByteBuffer.allocate(4); - } - - // Go until we fill the integer sized current buffer. - while (currentBuffer.hasRemaining() && plain.hasRemaining()) { - currentBuffer.put(plain.get()); - } - - // Didn't we get enough yet to figure out next frame size. - if (currentBuffer.hasRemaining()) { - return; - } else { - currentBuffer.flip(); - nextFrameSize = currentBuffer.getInt(); - } - } else { - // Either we are completing a previous read of the next frame size or its - // fully contained in plain already. - if (currentBuffer != null) { - // Finish the frame size integer read and get from the current buffer. - while (currentBuffer.hasRemaining()) { - currentBuffer.put(plain.get()); - } - - currentBuffer.flip(); - nextFrameSize = currentBuffer.getInt(); - } else { - nextFrameSize = plain.getInt(); - } - } - } - - // There are three possibilities when we get here. We could have a partial frame, - // a full frame, or more than 1 frame - while (true) { - LOG.debug("Entering while loop with plain.position {} remaining {} ", plain.position(), plain.remaining()); - // handle headers, which start with 'A','M','Q','P' rather than size - if (nextFrameSize == AMQP_HEADER_VALUE) { - nextFrameSize = handleAmqpHeader(plain); - if (nextFrameSize == -1) { - return; - } - } - - validateFrameSize(nextFrameSize); - - // now we have the data, let's reallocate and try to fill it, (currentBuffer.putInt() is called - // because we need to put back the 4 bytes we read to determine the size) - currentBuffer = ByteBuffer.allocate(nextFrameSize ); - currentBuffer.putInt(nextFrameSize); - if (currentBuffer.remaining() >= plain.remaining()) { - currentBuffer.put(plain); - } else { - byte[] fill = new byte[currentBuffer.remaining()]; - plain.get(fill); - currentBuffer.put(fill); - } - - // Either we have enough data for a new command or we have to wait for some more. If hasRemaining is true, - // we have not filled the buffer yet, i.e. we haven't received the full frame. - if (currentBuffer.hasRemaining()) { - return; - } else { - currentBuffer.flip(); - LOG.debug("Calling doConsume with position {} limit {}", currentBuffer.position(), currentBuffer.limit()); - doConsume(AmqpSupport.toBuffer(currentBuffer)); - - // Determine if there are more frames to process - if (plain.hasRemaining()) { - if (plain.remaining() < 4) { - nextFrameSize = 4; - } else { - nextFrameSize = plain.getInt(); - } - } else { - nextFrameSize = -1; - currentBuffer = null; - return; - } - } - } + amqpNioTransportHelper.processCommand(plain); } - - private void validateFrameSize(int frameSize) throws IOException { - if (nextFrameSize > AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE) { - throw new IOException("Frame size of " + nextFrameSize + - "larger than max allowed " + AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE); - } - } - - private int handleAmqpHeader(ByteBuffer plain) { - int nextFrameSize; - - LOG.debug("Consuming AMQP_HEADER"); - currentBuffer = ByteBuffer.allocate(8); - currentBuffer.putInt(AMQP_HEADER_VALUE); - while (currentBuffer.hasRemaining()) { - currentBuffer.put(plain.get()); - } - currentBuffer.flip(); - if (!magicConsumed) { // The first case we see is special and has to be handled differently - doConsume(new AmqpHeader(new Buffer(currentBuffer))); - magicConsumed = true; - } else { - doConsume(AmqpSupport.toBuffer(currentBuffer)); - } - - if (plain.hasRemaining()) { - if (plain.remaining() < 4) { - nextFrameSize = 4; - } else { - nextFrameSize = plain.getInt(); - } - } else { - nextFrameSize = -1; - currentBuffer = null; - } - return nextFrameSize; - } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/87420cc4/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java index e94cb0b..ee2694c 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java @@ -16,8 +16,17 @@ */ package org.apache.activemq.transport.amqp; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; +import org.apache.activemq.transport.nio.NIOOutputStream; +import org.apache.activemq.transport.nio.SelectorManager; +import org.apache.activemq.transport.nio.SelectorSelection; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.wireformat.WireFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.SocketFactory; import java.io.DataOutputStream; import java.io.EOFException; import java.io.IOException; @@ -28,26 +37,14 @@ import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; -import javax.net.SocketFactory; - -import org.apache.activemq.transport.nio.NIOOutputStream; -import org.apache.activemq.transport.nio.SelectorManager; -import org.apache.activemq.transport.nio.SelectorSelection; -import org.apache.activemq.transport.tcp.TcpTransport; -import org.apache.activemq.util.IOExceptionSupport; -import org.apache.activemq.util.ServiceStopper; -import org.apache.activemq.wireformat.WireFormat; -import org.fusesource.hawtbuf.Buffer; - /** * An implementation of the {@link org.apache.activemq.transport.Transport} interface for using AMQP over NIO */ public class AmqpNioTransport extends TcpTransport { - private final DataInputStream amqpHeaderValue = new DataInputStream(new ByteArrayInputStream(new byte[]{'A', 'M', 'Q', 'P'})); - private final Integer AMQP_HEADER_VALUE = amqpHeaderValue.readInt(); - + private static final Logger LOG = LoggerFactory.getLogger(AmqpNioTransport.class); private SocketChannel channel; private SelectorSelection selection; + private AmqpNioTransportHelper amqpNioTransportHelper = new AmqpNioTransportHelper(this); private ByteBuffer inputBuffer; @@ -110,40 +107,7 @@ public class AmqpNioTransport extends TcpTransport { receiveCounter += readSize; inputBuffer.flip(); - - if( !magicRead ) { - if( inputBuffer.remaining()>= 8 ) { - magicRead = true; - Buffer magic = new Buffer(8); - for (int i = 0; i < 8; i++) { - magic.data[i] = inputBuffer.get(); - } - doConsume(new AmqpHeader(magic)); - } else { - inputBuffer.flip(); - continue; - } - } - - while(inputBuffer.position() < inputBuffer.limit()) { - inputBuffer.mark(); - int commandSize = inputBuffer.getInt(); - inputBuffer.reset(); - - // handles buffers starting with 'A','M','Q','P' rather than size - if (commandSize == AMQP_HEADER_VALUE) { - doConsume(AmqpSupport.toBuffer(inputBuffer)); - break; - } - - byte[] bytes = new byte[commandSize]; - ByteBuffer commandBuffer = ByteBuffer.allocate(commandSize); - inputBuffer.get(bytes, 0, commandSize); - commandBuffer.put(bytes); - commandBuffer.flip(); - doConsume(AmqpSupport.toBuffer(commandBuffer)); - commandBuffer.clear(); - } + amqpNioTransportHelper.processCommand(inputBuffer); // clear the buffer inputBuffer.clear(); http://git-wip-us.apache.org/repos/asf/activemq/blob/87420cc4/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportHelper.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportHelper.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportHelper.java new file mode 100644 index 0000000..09cab5d --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportHelper.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import org.apache.activemq.transport.TransportSupport; +import org.fusesource.hawtbuf.Buffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +public class AmqpNioTransportHelper { + private final DataInputStream amqpHeaderValue = new DataInputStream(new ByteArrayInputStream(new byte[]{'A', 'M', 'Q', 'P'})); + private final Integer AMQP_HEADER_VALUE; + private static final Logger LOG = LoggerFactory.getLogger(AmqpNioTransportHelper.class); + protected int nextFrameSize = -1; + protected ByteBuffer currentBuffer; + private boolean magicConsumed = false; + private TransportSupport transportSupport; + + public AmqpNioTransportHelper(TransportSupport transportSupport) throws IOException { + AMQP_HEADER_VALUE = amqpHeaderValue.readInt(); + this.transportSupport = transportSupport; + } + + protected void processCommand(ByteBuffer plain) throws Exception { + // Are we waiting for the next Command or building on the current one? The frame size is in the first 4 bytes. + if (nextFrameSize == -1) { + // We can get small packets that don't give us enough for the frame size + // so allocate enough for the initial size value and + if (plain.remaining() < 4) { + if (currentBuffer == null) { + currentBuffer = ByteBuffer.allocate(4); + } + + // Go until we fill the integer sized current buffer. + while (currentBuffer.hasRemaining() && plain.hasRemaining()) { + currentBuffer.put(plain.get()); + } + + // Didn't we get enough yet to figure out next frame size. + if (currentBuffer.hasRemaining()) { + return; + } else { + currentBuffer.flip(); + nextFrameSize = currentBuffer.getInt(); + } + } else { + // Either we are completing a previous read of the next frame size or its + // fully contained in plain already. + if (currentBuffer != null) { + // Finish the frame size integer read and get from the current buffer. + while (currentBuffer.hasRemaining()) { + currentBuffer.put(plain.get()); + } + + currentBuffer.flip(); + nextFrameSize = currentBuffer.getInt(); + } else { + nextFrameSize = plain.getInt(); + } + } + } + + // There are three possibilities when we get here. We could have a partial frame, + // a full frame, or more than 1 frame + while (true) { + // handle headers, which start with 'A','M','Q','P' rather than size + if (nextFrameSize == AMQP_HEADER_VALUE) { + nextFrameSize = handleAmqpHeader(plain); + if (nextFrameSize == -1) { + return; + } + } + validateFrameSize(nextFrameSize); + + // now we have the data, let's reallocate and try to fill it, (currentBuffer.putInt() is called TODO update + // because we need to put back the 4 bytes we read to determine the size) + if (currentBuffer == null || (currentBuffer.limit() == 4)) { + currentBuffer = ByteBuffer.allocate(nextFrameSize); + currentBuffer.putInt(nextFrameSize); + } + + if (currentBuffer.remaining() >= plain.remaining()) { + currentBuffer.put(plain); + } else { + byte[] fill = new byte[currentBuffer.remaining()]; + plain.get(fill); + currentBuffer.put(fill); + } + + // Either we have enough data for a new command or we have to wait for some more. If hasRemaining is true, + // we have not filled the buffer yet, i.e. we haven't received the full frame. + if (currentBuffer.hasRemaining()) { + return; + } else { + currentBuffer.flip(); + LOG.debug("Calling doConsume with position {} limit {}", currentBuffer.position(), currentBuffer.limit()); + transportSupport.doConsume(AmqpSupport.toBuffer(currentBuffer)); + currentBuffer = null; + nextFrameSize = -1; + + // Determine if there are more frames to process + if (plain.hasRemaining()) { + if (plain.remaining() < 4) { + currentBuffer = ByteBuffer.allocate(4); + while (currentBuffer.hasRemaining() && plain.hasRemaining()) { + currentBuffer.put(plain.get()); + } + return; + } else { + nextFrameSize = plain.getInt(); + } + } else { + return; + } + } + } + } + + private void validateFrameSize(int frameSize) throws IOException { + if (nextFrameSize > AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE) { + throw new IOException("Frame size of " + nextFrameSize + + "larger than max allowed " + AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE); + } + } + + private int handleAmqpHeader(ByteBuffer plain) { + int nextFrameSize; + + LOG.debug("Consuming AMQP_HEADER"); + currentBuffer = ByteBuffer.allocate(8); + currentBuffer.putInt(AMQP_HEADER_VALUE); + while (currentBuffer.hasRemaining()) { + currentBuffer.put(plain.get()); + } + currentBuffer.flip(); + if (!magicConsumed) { // The first case we see is special and has to be handled differently + transportSupport.doConsume(new AmqpHeader(new Buffer(currentBuffer))); + magicConsumed = true; + } else { + transportSupport.doConsume(AmqpSupport.toBuffer(currentBuffer)); + } + + if (plain.hasRemaining()) { + if (plain.remaining() < 4) { + nextFrameSize = 4; + } else { + nextFrameSize = plain.getInt(); + } + } else { + nextFrameSize = -1; + currentBuffer = null; + } + return nextFrameSize; + } + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/87420cc4/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java index 6239904..7ae8ef7 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java @@ -784,6 +784,42 @@ public class JMSClientTest extends AmqpTestSupport { connection.close(); } + private String createLargeString(int sizeInBytes) { + byte[] base = {1, 2, 3, 4, 5, 6, 7, 8, 9, 0}; + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < sizeInBytes; i++) { + builder.append(base[i % base.length]); + } + + LOG.debug("Created string with size : " + builder.toString().getBytes().length + " bytes"); + return builder.toString(); + } + + @Test(timeout = 60 * 1000) + public void testSendLargeMessage() throws JMSException, InterruptedException { + Connection connection = createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + String queueName = name.toString(); + Queue queue = session.createQueue(queueName); + + MessageProducer producer=session.createProducer(queue); + int messageSize = 1024 * 1024; + String messageText = createLargeString(messageSize); + Message m=session.createTextMessage(messageText); + LOG.debug("Sending message of {} bytes on queue {}", messageSize, queueName); + producer.send(m); + + MessageConsumer consumer=session.createConsumer(queue); + + Message message = consumer.receive(); + assertNotNull(message); + assertTrue(message instanceof TextMessage); + TextMessage textMessage = (TextMessage) message; + LOG.debug(">>>> Received message of length {}", textMessage.getText().length()); + assertEquals(messageSize, textMessage.getText().length()); + assertEquals(messageText, textMessage.getText()); + } + private Connection createConnection() throws JMSException { return createConnection(name.toString(), false, false); } http://git-wip-us.apache.org/repos/asf/activemq/blob/87420cc4/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsNioTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsNioTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsNioTest.java index afb6ece..7c78d0f 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsNioTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsNioTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.transport.amqp.joram; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Rule; import org.junit.rules.Timeout; import org.junit.runner.RunWith; @@ -41,6 +42,7 @@ import org.objectweb.jtests.jms.conform.topic.TemporaryTopicTest; /** * Run the JoramJmsTests using amqp+nio */ +@Ignore("AMQ-5094") @RunWith(Suite.class) @Suite.SuiteClasses({ // TopicSessionTest.class, // Hangs, see https://issues.apache.org/jira/browse/PROTON-154
