Repository: activemq Updated Branches: refs/heads/trunk b5c6c1eae -> f2653e693
Clean up a bit, remove commented out code from other transports. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f2653e69 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f2653e69 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f2653e69 Branch: refs/heads/trunk Commit: f2653e69362cef9de49e6b0a5a248c7ad0602b6b Parents: b5c6c1e Author: Timothy Bish <[email protected]> Authored: Wed May 28 15:27:34 2014 -0400 Committer: Timothy Bish <[email protected]> Committed: Wed May 28 15:27:34 2014 -0400 ---------------------------------------------------------------------- .../amqp/AMQPProtocolDiscriminator.java | 16 ++-- .../transport/amqp/AMQPSslTransportFactory.java | 16 ---- .../transport/amqp/ActiveMQJMSVendor.java | 22 ++--- .../activemq/transport/amqp/AmqpHeader.java | 29 ++++--- .../transport/amqp/AmqpNioSslTransport.java | 12 +-- .../amqp/AmqpNioSslTransportFactory.java | 4 +- .../transport/amqp/AmqpNioTransport.java | 28 ++++--- .../transport/amqp/AmqpNioTransportFactory.java | 39 ++++----- .../transport/amqp/AmqpNioTransportHelper.java | 50 ++++++----- .../transport/amqp/AmqpProtocolConverter.java | 88 +++++++++++--------- .../transport/amqp/AmqpProtocolException.java | 1 - .../activemq/transport/amqp/AmqpSupport.java | 12 ++- .../activemq/transport/amqp/AmqpTransport.java | 7 +- .../transport/amqp/AmqpTransportFactory.java | 28 ++----- .../transport/amqp/AmqpTransportFilter.java | 33 ++++---- .../transport/amqp/AmqpWireFormatFactory.java | 2 + .../transport/amqp/ResponseHandler.java | 3 +- 17 files changed, 189 insertions(+), 201 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java index 8a03d09..bf0c655 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java @@ -16,11 +16,11 @@ */ package org.apache.activemq.transport.amqp; -import org.apache.activemq.command.Command; - import java.io.IOException; import java.util.ArrayList; +import org.apache.activemq.command.Command; + /** * Used to assign the best implementation of a AmqpProtocolConverter to the * AmqpTransport based on the AmqpHeader that the client sends us. @@ -31,12 +31,13 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter { interface Discriminator { boolean matches(AmqpHeader header); + IAmqpProtocolConverter create(AmqpTransport transport); } static final private ArrayList<Discriminator> DISCRIMINATORS = new ArrayList<Discriminator>(); static { - DISCRIMINATORS.add(new Discriminator(){ + DISCRIMINATORS.add(new Discriminator() { @Override public IAmqpProtocolConverter create(AmqpTransport transport) { @@ -45,11 +46,12 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter { @Override public boolean matches(AmqpHeader header) { - switch( header.getProtocolId() ) { + switch (header.getProtocolId()) { case 0: case 3: - if( header.getMajor() == 1 && header.getMinor()==0 && header.getRevision()==0 ) + if (header.getMajor() == 1 && header.getMinor() == 0 && header.getRevision() == 0) { return true; + } } return false; } @@ -70,12 +72,12 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter { Discriminator match = null; for (Discriminator discriminator : DISCRIMINATORS) { - if( discriminator.matches(header) ) { + if (discriminator.matches(header)) { match = discriminator; } } // Lets use first in the list if none are a good match. - if( match == null ) { + if (match == null) { match = DISCRIMINATORS.get(0); } IAmqpProtocolConverter next = match.create(transport); http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java index 0612fd9..4c036a9 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java @@ -58,11 +58,6 @@ public class AMQPSslTransportFactory extends SslTransportFactory implements Brok transport = ((MutexTransport) transport).getNext(); } - // MutexTransport mutex = transport.narrow(MutexTransport.class); - // if (mutex != null) { - // mutex.setSyncOnCommand(true); - // } - return transport; } @@ -71,17 +66,6 @@ public class AMQPSslTransportFactory extends SslTransportFactory implements Brok this.brokerContext = brokerService.getBrokerContext(); } - // protected Transport createInactivityMonitor(Transport transport, - // WireFormat format) { - // AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, - // format); - // - // AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class); - // filter.setInactivityMonitor(monitor); - // - // return monitor; - // } - @Override protected boolean isUseInactivityMonitor(Transport transport) { return false; http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java index bb81ca4..8a6137c 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java @@ -42,13 +42,13 @@ import org.apache.activemq.command.ActiveMQTopic; import org.apache.qpid.proton.jms.JMSVendor; /** - * @author <a href="http://hiramchirino.com">Hiram Chirino</a> */ public class ActiveMQJMSVendor extends JMSVendor { final public static ActiveMQJMSVendor INSTANCE = new ActiveMQJMSVendor(); - private ActiveMQJMSVendor() {} + private ActiveMQJMSVendor() { + } @Override public BytesMessage createBytesMessage() { @@ -87,16 +87,16 @@ public class ActiveMQJMSVendor extends JMSVendor { @Override public <T extends Destination> T createDestination(String name, Class<T> kind) { - if( kind == Queue.class ) { + if (kind == Queue.class) { return kind.cast(new ActiveMQQueue(name)); } - if( kind == Topic.class ) { + if (kind == Topic.class) { return kind.cast(new ActiveMQTopic(name)); } - if( kind == TemporaryQueue.class ) { + if (kind == TemporaryQueue.class) { return kind.cast(new ActiveMQTempQueue(name)); } - if( kind == TemporaryTopic.class ) { + if (kind == TemporaryTopic.class) { return kind.cast(new ActiveMQTempTopic(name)); } return kind.cast(ActiveMQDestination.createDestination(name, ActiveMQDestination.QUEUE_TYPE)); @@ -104,26 +104,26 @@ public class ActiveMQJMSVendor extends JMSVendor { @Override public void setJMSXUserID(Message msg, String value) { - ((ActiveMQMessage)msg).setUserID(value); + ((ActiveMQMessage) msg).setUserID(value); } @Override public void setJMSXGroupID(Message msg, String value) { - ((ActiveMQMessage)msg).setGroupID(value); + ((ActiveMQMessage) msg).setGroupID(value); } @Override public void setJMSXGroupSequence(Message msg, int value) { - ((ActiveMQMessage)msg).setGroupSequence(value); + ((ActiveMQMessage) msg).setGroupSequence(value); } @Override public void setJMSXDeliveryCount(Message msg, long value) { - ((ActiveMQMessage)msg).setRedeliveryCounter((int) value); + ((ActiveMQMessage) msg).setRedeliveryCounter((int) value); } @Override public String toAddress(Destination dest) { - return ((ActiveMQDestination)dest).getQualifiedName(); + return ((ActiveMQDestination) dest).getQualifiedName(); } } http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpHeader.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpHeader.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpHeader.java index 07d0222..aaf5944 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpHeader.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpHeader.java @@ -19,65 +19,64 @@ package org.apache.activemq.transport.amqp; import org.fusesource.hawtbuf.Buffer; /** - * @author <a href="http://hiramchirino.com">Hiram Chirino</a> */ public class AmqpHeader { - static final Buffer PREFIX = new Buffer(new byte[]{ - 'A', 'M', 'Q', 'P' - }); + static final Buffer PREFIX = new Buffer(new byte[] { 'A', 'M', 'Q', 'P' }); private Buffer buffer; - public AmqpHeader(){ - this(new Buffer(new byte[]{ - 'A', 'M', 'Q', 'P', 0, 1, 0, 0 - })); + public AmqpHeader() { + this(new Buffer(new byte[] { 'A', 'M', 'Q', 'P', 0, 1, 0, 0 })); } - public AmqpHeader(Buffer buffer){ + public AmqpHeader(Buffer buffer) { setBuffer(buffer); } public int getProtocolId() { return buffer.get(4) & 0xFF; } + public void setProtocolId(int value) { - buffer.data[buffer.offset+4] = (byte) value; + buffer.data[buffer.offset + 4] = (byte) value; } public int getMajor() { return buffer.get(5) & 0xFF; } + public void setMajor(int value) { - buffer.data[buffer.offset+5] = (byte) value; + buffer.data[buffer.offset + 5] = (byte) value; } public int getMinor() { return buffer.get(6) & 0xFF; } + public void setMinor(int value) { - buffer.data[buffer.offset+6] = (byte) value; + buffer.data[buffer.offset + 6] = (byte) value; } public int getRevision() { return buffer.get(7) & 0xFF; } + public void setRevision(int value) { - buffer.data[buffer.offset+7] = (byte) value; + buffer.data[buffer.offset + 7] = (byte) value; } public Buffer getBuffer() { return buffer; } + public void setBuffer(Buffer value) { - if( !value.startsWith(PREFIX) || value.length()!=8 ) { + if (!value.startsWith(PREFIX) || value.length() != 8) { throw new IllegalArgumentException("Not an AMQP header buffer"); } buffer = value.buffer(); } - @Override public String toString() { return buffer.toString(); http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java index c569f05..4eb0e6f 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java @@ -16,18 +16,20 @@ */ package org.apache.activemq.transport.amqp; -import org.apache.activemq.transport.nio.NIOSSLTransport; -import org.apache.activemq.wireformat.WireFormat; - -import javax.net.SocketFactory; import java.io.IOException; import java.net.Socket; import java.net.URI; import java.net.UnknownHostException; import java.nio.ByteBuffer; +import javax.net.SocketFactory; + +import org.apache.activemq.transport.nio.NIOSSLTransport; +import org.apache.activemq.wireformat.WireFormat; + public class AmqpNioSslTransport extends NIOSSLTransport { - private AmqpNioTransportHelper amqpNioTransportHelper = new AmqpNioTransportHelper(this); + + private final AmqpNioTransportHelper amqpNioTransportHelper = new AmqpNioTransportHelper(this); public AmqpNioSslTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { super(wireFormat, socketFactory, remoteLocation, localLocation); http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java index 2306cc5..5e2fa06 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java @@ -35,11 +35,12 @@ import org.apache.activemq.wireformat.WireFormat; public class AmqpNioSslTransportFactory extends AmqpNioTransportFactory { - SSLContext context; + protected SSLContext context; @Override protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { return new TcpTransportServer(this, location, serverSocketFactory) { + @Override protected Transport createTransport(Socket socket, WireFormat format) throws IOException { AmqpNioSslTransport transport = new AmqpNioSslTransport(format, socket); if (context != null) { @@ -71,5 +72,4 @@ public class AmqpNioSslTransportFactory extends AmqpNioTransportFactory { } return super.doBind(location); } - } http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java index ee2694c..ff58404 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java @@ -16,17 +16,6 @@ */ package org.apache.activemq.transport.amqp; -import org.apache.activemq.transport.nio.NIOOutputStream; -import org.apache.activemq.transport.nio.SelectorManager; -import org.apache.activemq.transport.nio.SelectorSelection; -import org.apache.activemq.transport.tcp.TcpTransport; -import org.apache.activemq.util.IOExceptionSupport; -import org.apache.activemq.util.ServiceStopper; -import org.apache.activemq.wireformat.WireFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.net.SocketFactory; import java.io.DataOutputStream; import java.io.EOFException; import java.io.IOException; @@ -37,14 +26,28 @@ import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; +import javax.net.SocketFactory; + +import org.apache.activemq.transport.nio.NIOOutputStream; +import org.apache.activemq.transport.nio.SelectorManager; +import org.apache.activemq.transport.nio.SelectorSelection; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.wireformat.WireFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * An implementation of the {@link org.apache.activemq.transport.Transport} interface for using AMQP over NIO */ public class AmqpNioTransport extends TcpTransport { + private static final Logger LOG = LoggerFactory.getLogger(AmqpNioTransport.class); + private SocketChannel channel; private SelectorSelection selection; - private AmqpNioTransportHelper amqpNioTransportHelper = new AmqpNioTransportHelper(this); + private final AmqpNioTransportHelper amqpNioTransportHelper = new AmqpNioTransportHelper(this); private ByteBuffer inputBuffer; @@ -71,6 +74,7 @@ public class AmqpNioTransport extends TcpTransport { @Override public void onError(SelectorSelection selection, Throwable error) { + LOG.trace("Error detected: {}", error.getMessage()); if (error instanceof IOException) { onException((IOException) error); } else { http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java index fde7a16..c67d3b6 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java @@ -16,6 +16,17 @@ */ package org.apache.activemq.transport.amqp; +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; + +import javax.net.ServerSocketFactory; +import javax.net.SocketFactory; + import org.apache.activemq.broker.BrokerContext; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; @@ -27,16 +38,6 @@ import org.apache.activemq.transport.tcp.TcpTransportServer; import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.wireformat.WireFormat; -import javax.net.ServerSocketFactory; -import javax.net.SocketFactory; -import java.io.IOException; -import java.net.Socket; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.UnknownHostException; -import java.util.HashMap; -import java.util.Map; - /** * A <a href="http://amqp.org/">AMQP</a> over NIO transport factory */ @@ -44,18 +45,22 @@ public class AmqpNioTransportFactory extends NIOTransportFactory implements Brok private BrokerContext brokerContext = null; + @Override protected String getDefaultWireFormatType() { return "amqp"; } + @Override protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { return new TcpTransportServer(this, location, serverSocketFactory) { + @Override protected Transport createTransport(Socket socket, WireFormat format) throws IOException { return new AmqpNioTransport(format, socket); } }; } + @Override protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException { return new AmqpNioTransport(wf, socketFactory, location, localLocation); } @@ -70,14 +75,10 @@ public class AmqpNioTransportFactory extends NIOTransportFactory implements Brok transport = ((MutexTransport)transport).getNext(); } -// MutexTransport mutex = transport.narrow(MutexTransport.class); -// if (mutex != null) { -// mutex.setSyncOnCommand(true); -// } - return transport; } + @Override @SuppressWarnings("rawtypes") public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { transport = new AmqpTransportFilter(transport, format, brokerContext); @@ -85,17 +86,11 @@ public class AmqpNioTransportFactory extends NIOTransportFactory implements Brok return super.compositeConfigure(transport, format, options); } + @Override public void setBrokerService(BrokerService brokerService) { this.brokerContext = brokerService.getBrokerContext(); } -// protected Transport createInactivityMonitor(Transport transport, WireFormat format) { -// AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format); -// AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class); -// filter.setInactivityMonitor(monitor); -// return monitor; -// } - @Override protected boolean isUseInactivityMonitor(Transport transport) { return false; http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportHelper.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportHelper.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportHelper.java index 076fff4..021c289 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportHelper.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportHelper.java @@ -16,24 +16,25 @@ */ package org.apache.activemq.transport.amqp; -import org.apache.activemq.transport.TransportSupport; -import org.fusesource.hawtbuf.Buffer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.activemq.transport.TransportSupport; +import org.fusesource.hawtbuf.Buffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class AmqpNioTransportHelper { - private final DataInputStream amqpHeaderValue = new DataInputStream(new ByteArrayInputStream(new byte[]{'A', 'M', 'Q', 'P'})); + + private final DataInputStream amqpHeaderValue = new DataInputStream(new ByteArrayInputStream(new byte[] { 'A', 'M', 'Q', 'P' })); private final Integer AMQP_HEADER_VALUE; private static final Logger LOG = LoggerFactory.getLogger(AmqpNioTransportHelper.class); protected int nextFrameSize = -1; protected ByteBuffer currentBuffer; private boolean magicConsumed = false; - private TransportSupport transportSupport; + private final TransportSupport transportSupport; public AmqpNioTransportHelper(TransportSupport transportSupport) throws IOException { AMQP_HEADER_VALUE = amqpHeaderValue.readInt(); @@ -41,10 +42,11 @@ public class AmqpNioTransportHelper { } protected void processCommand(ByteBuffer plain) throws Exception { - // Are we waiting for the next Command or building on the current one? The frame size is in the first 4 bytes. + // Are we waiting for the next Command or building on the current one? + // The frame size is in the first 4 bytes. if (nextFrameSize == -1) { - // We can get small packets that don't give us enough for the frame size - // so allocate enough for the initial size value and + // We can get small packets that don't give us enough for the frame + // size so allocate enough for the initial size value and if (plain.remaining() < 4) { if (currentBuffer == null) { currentBuffer = ByteBuffer.allocate(4); @@ -63,10 +65,11 @@ public class AmqpNioTransportHelper { nextFrameSize = currentBuffer.getInt(); } } else { - // Either we are completing a previous read of the next frame size or its - // fully contained in plain already. + // Either we are completing a previous read of the next frame + // size or its fully contained in plain already. if (currentBuffer != null) { - // Finish the frame size integer read and get from the current buffer. + // Finish the frame size integer read and get from the + // current buffer. while (currentBuffer.hasRemaining()) { currentBuffer.put(plain.get()); } @@ -79,8 +82,8 @@ public class AmqpNioTransportHelper { } } - // There are three possibilities when we get here. We could have a partial frame, - // a full frame, or more than 1 frame + // There are three possibilities when we get here. We could have a + // partial frame, a full frame, or more than 1 frame while (true) { // handle headers, which start with 'A','M','Q','P' rather than size if (nextFrameSize == AMQP_HEADER_VALUE) { @@ -91,8 +94,10 @@ public class AmqpNioTransportHelper { } validateFrameSize(nextFrameSize); - // now we have the data, let's reallocate and try to fill it, (currentBuffer.putInt() is called TODO update - // because we need to put back the 4 bytes we read to determine the size) + // now we have the data, let's reallocate and try to fill it, + // (currentBuffer.putInt() is called TODO update + // because we need to put back the 4 bytes we read to determine the + // size) if (currentBuffer == null || (currentBuffer.limit() == 4)) { currentBuffer = ByteBuffer.allocate(nextFrameSize); currentBuffer.putInt(nextFrameSize); @@ -106,8 +111,9 @@ public class AmqpNioTransportHelper { currentBuffer.put(fill); } - // Either we have enough data for a new command or we have to wait for some more. If hasRemaining is true, - // we have not filled the buffer yet, i.e. we haven't received the full frame. + // Either we have enough data for a new command or we have to wait for some more. + // If hasRemaining is true, we have not filled the buffer yet, i.e. we haven't + // received the full frame. if (currentBuffer.hasRemaining()) { return; } else { @@ -137,8 +143,7 @@ public class AmqpNioTransportHelper { private void validateFrameSize(int frameSize) throws IOException { if (nextFrameSize > AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE) { - throw new IOException("Frame size of " + nextFrameSize + - "larger than max allowed " + AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE); + throw new IOException("Frame size of " + nextFrameSize + "larger than max allowed " + AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE); } } @@ -152,7 +157,7 @@ public class AmqpNioTransportHelper { currentBuffer.put(plain.get()); } currentBuffer.flip(); - if (!magicConsumed) { // The first case we see is special and has to be handled differently + if (!magicConsumed) { // The first case we see is special and has to be handled differently transportSupport.doConsume(new AmqpHeader(new Buffer(currentBuffer))); magicConsumed = true; } else { @@ -172,5 +177,4 @@ public class AmqpNioTransportHelper { return nextFrameSize; } - } http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index f8e5686..ee84a25 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -110,36 +110,38 @@ import org.slf4j.LoggerFactory; class AmqpProtocolConverter implements IAmqpProtocolConverter { - static final Logger TRACE_FRAMES = AmqpTransportFilter.TRACE_FRAMES; + private static final Logger TRACE_FRAMES = AmqpTransportFilter.TRACE_FRAMES; private static final Logger LOG = LoggerFactory.getLogger(AmqpProtocolConverter.class); - static final public byte[] EMPTY_BYTE_ARRAY = new byte[] {}; + private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {}; private final AmqpTransport amqpTransport; private static final Symbol COPY = Symbol.getSymbol("copy"); private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector"); private static final Symbol NO_LOCAL = Symbol.valueOf("no-local"); private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED"); - private static final ProtonFactoryLoader<MessageFactory> messageFactoryLoader = - new ProtonFactoryLoader<MessageFactory>(MessageFactory.class); + private static final ProtonFactoryLoader<MessageFactory> messageFactoryLoader = new ProtonFactoryLoader<MessageFactory>(MessageFactory.class); - int prefetch = 100; - - EngineFactory engineFactory = new EngineFactoryImpl(); - Transport protonTransport = engineFactory.createTransport(); - Connection protonConnection = engineFactory.createConnection(); - MessageFactory messageFactory = messageFactoryLoader.loadFactory(); - Collector eventCollector = new CollectorImpl(); + protected int prefetch = 100; + protected EngineFactory engineFactory = new EngineFactoryImpl(); + protected Transport protonTransport = engineFactory.createTransport(); + protected Connection protonConnection = engineFactory.createConnection(); + protected MessageFactory messageFactory = messageFactoryLoader.loadFactory(); + protected Collector eventCollector = new CollectorImpl(); public AmqpProtocolConverter(AmqpTransport transport) { this.amqpTransport = transport; int maxFrameSize = AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE; - // AMQ-4914 - Setting the max frame size to large stalls out the QPid client on sends or - // consume due to no session credit. Once fixed we should set this value using - // the configured maxFrameSize on the URI. - //int maxFrameSize = transport.getWireFormat().getMaxFrameSize() > Integer.MAX_VALUE ? - // Integer.MAX_VALUE : (int) transport.getWireFormat().getMaxFrameSize(); + // AMQ-4914 - Setting the max frame size to large stalls out the QPid + // client on sends or + // consume due to no session credit. Once fixed we should set this value + // using + // the configured maxFrameSize on the URI. + // int maxFrameSize = transport.getWireFormat().getMaxFrameSize() > + // Integer.MAX_VALUE ? + // Integer.MAX_VALUE : (int) + // transport.getWireFormat().getMaxFrameSize(); this.protonTransport.setMaxFrameSize(maxFrameSize); this.protonTransport.bind(this.protonConnection); @@ -245,7 +247,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { if (parts.length > 1) { connectionInfo.setPassword(parts[1].utf8().toString()); } - // We can't really auth at this point since we don't know the client id yet.. :( + // We can't really auth at this point since we don't + // know the client id yet.. :( sasl.done(Sasl.SaslOutcome.PN_SASL_OK); amqpTransport.getWireFormat().magicRead = false; sasl = null; @@ -371,7 +374,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { if (rh != null) { rh.onResponse(this, response); } else { - // Pass down any unexpected errors. Should this close the connection? + // Pass down any unexpected errors. Should this close the + // connection? if (response.isException()) { Throwable exception = ((ExceptionResponse) response).getException(); handleException(exception); @@ -393,7 +397,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { } } } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) { - // Pass down any unexpected async errors. Should this close the connection? + // Pass down any unexpected async errors. Should this close the + // connection? Throwable exception = ((ConnectionError) command).getException(); handleException(exception); } else if (command.isBrokerInfo()) { @@ -413,9 +418,11 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { abstract public void onDelivery(Delivery delivery) throws Exception; - public void onClose() throws Exception {} + public void onClose() throws Exception { + } - public void drainCheck() {} + public void drainCheck() { + } abstract void doCommit() throws Exception; @@ -544,10 +551,12 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { } @Override - void doCommit() throws Exception {} + void doCommit() throws Exception { + } @Override - void doRollback() throws Exception {} + void doRollback() throws Exception { + } abstract protected void onMessage(Receiver receiver, Delivery delivery, Buffer buffer) throws Exception; } @@ -575,7 +584,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { } message.setProducerId(producerId); - // Always override the AMQP client's MessageId with our own. Preserve the + // Always override the AMQP client's MessageId with our own. + // Preserve the // original in the TextView property for later Ack. MessageId messageId = new MessageId(producerId, messageIdGenerator.getNextSequenceId()); @@ -599,8 +609,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { message.setTransactionId(new LocalTransactionId(connectionId, txid)); } - // Lets handle the case where the expiration was set, but the timestamp - // was not set by the client. Lets assign the timestamp now, and adjust the + // Lets handle the case where the expiration was set, but the + // timestamp + // was not set by the client. Lets assign the timestamp now, and + // adjust the // expiration. if (message.getExpiration() != 0) { if (message.getTimestamp() == 0) { @@ -624,8 +636,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { delivery.disposition(rejected); } else { if (receiver.getCredit() <= (prefetch * .2)) { - LOG.trace("Sending more credit ({}) to producer: {}", - prefetch - receiver.getCredit(), producerId); + LOG.trace("Sending more credit ({}) to producer: {}", prefetch - receiver.getCredit(), producerId); receiver.flow(prefetch - receiver.getCredit()); } @@ -638,8 +649,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { }); } else { if (receiver.getCredit() <= (prefetch * .2)) { - LOG.trace("Sending more credit ({}) to producer: {}", - prefetch - receiver.getCredit(), producerId); + LOG.trace("Sending more credit ({}) to producer: {}", prefetch - receiver.getCredit(), producerId); receiver.flow(prefetch - receiver.getCredit()); pumpProtonToSocket(); } @@ -942,8 +952,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { ActiveMQMessage temp = null; if (md.getMessage() != null) { - // Topics can dispatch the same Message to more than one consumer - // so we must copy to prevent concurrent read / write to the same + // Topics can dispatch the same Message to more than one + // consumer + // so we must copy to prevent concurrent read / write to + // the same // message object. if (md.getDestination().isTopic()) { synchronized (md.getMessage()) { @@ -993,7 +1005,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { } if (ackType == -1) { - // we are going to settle, but redeliver.. we we won't yet ack to ActiveMQ + // we are going to settle, but redeliver.. we we won't yet ack + // to ActiveMQ delivery.settle(); onMessageDispatch((MessageDispatch) delivery.getContext()); } else { @@ -1013,7 +1026,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { LocalTransactionId localTxId = new LocalTransactionId(connectionId, txid); ack.setTransactionId(localTxId); - // Store the message sent in this TX we might need to re-send on rollback + // Store the message sent in this TX we might need to + // re-send on rollback md.getMessage().setTransactionId(localTxId); dispatchedInTx.addFirst(md); } @@ -1042,7 +1056,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { public void drainCheck() { // If we are a browser.. lets not say we are drained until // we hit the end of browse message. - if( info.isBrowser() && !endOfBrowse) + if (info.isBrowser() && !endOfBrowse) return; if (outbound.isEmpty()) { @@ -1058,11 +1072,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { if (state instanceof TransactionalState) { TransactionalState txState = (TransactionalState) state; if (txState.getOutcome() instanceof DeliveryState) { - LOG.trace("onDelivery: TX delivery state = {}", state); - state = (DeliveryState) txState.getOutcome(); - if (state instanceof Accepted) { if (!delivery.remotelySettled()) { delivery.disposition(new Accepted()); @@ -1073,7 +1084,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { } else { if (state instanceof Accepted) { LOG.trace("onDelivery: accepted state = {}", state); - if (!delivery.remotelySettled()) { delivery.disposition(new Accepted()); } http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java index 94cdf70..3c94778 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java @@ -18,7 +18,6 @@ package org.apache.activemq.transport.amqp; import java.io.IOException; - public class AmqpProtocolException extends IOException { private static final long serialVersionUID = -2869735532997332242L; http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java index 4e992b7..e3680c5 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java @@ -16,28 +16,26 @@ */ package org.apache.activemq.transport.amqp; -import org.fusesource.hawtbuf.Buffer; - import java.nio.ByteBuffer; +import org.fusesource.hawtbuf.Buffer; + /** - * @author <a href="http://hiramchirino.com">Hiram Chirino</a> */ public class AmqpSupport { static public Buffer toBuffer(ByteBuffer data) { - if( data == null ) { + if (data == null) { return null; } Buffer rc; - if( data.isDirect() ) { + if (data.isDirect()) { rc = new Buffer(data.remaining()); data.get(rc.data); } else { rc = new Buffer(data); - data.position(data.position()+data.remaining()); + data.position(data.position() + data.remaining()); } return rc; } - } http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java index 02e0c10..58b776f 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java @@ -16,11 +16,11 @@ */ package org.apache.activemq.transport.amqp; -import org.apache.activemq.command.Command; - import java.io.IOException; import java.security.cert.X509Certificate; +import org.apache.activemq.command.Command; + /** * Basic interface that mediates between protocol converter and transport */ @@ -36,8 +36,6 @@ public interface AmqpTransport { public void onException(IOException error); -// public AmqpInactivityMonitor getInactivityMonitor(); - public AmqpWireFormat getWireFormat(); public void stop() throws Exception; @@ -49,6 +47,7 @@ public interface AmqpTransport { public boolean isTrace(); public IAmqpProtocolConverter getProtocolConverter(); + public void setProtocolConverter(IAmqpProtocolConverter protocolConverter); } http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java index 3cf72c9..e394c85 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java @@ -16,6 +16,9 @@ */ package org.apache.activemq.transport.amqp; +import java.util.HashMap; +import java.util.Map; + import org.apache.activemq.broker.BrokerContext; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; @@ -25,9 +28,6 @@ import org.apache.activemq.transport.tcp.TcpTransportFactory; import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.wireformat.WireFormat; -import java.util.HashMap; -import java.util.Map; - /** * A <a href="http://amqp.org/">AMQP</a> transport factory */ @@ -35,10 +35,12 @@ public class AmqpTransportFactory extends TcpTransportFactory implements BrokerS private BrokerContext brokerContext = null; + @Override protected String getDefaultWireFormatType() { return "amqp"; } + @Override @SuppressWarnings("rawtypes") public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { transport = new AmqpTransportFilter(transport, format, brokerContext); @@ -46,6 +48,7 @@ public class AmqpTransportFactory extends TcpTransportFactory implements BrokerS return super.compositeConfigure(transport, format, options); } + @Override public void setBrokerService(BrokerService brokerService) { this.brokerContext = brokerService.getBrokerContext(); } @@ -56,26 +59,13 @@ public class AmqpTransportFactory extends TcpTransportFactory implements BrokerS transport = super.serverConfigure(transport, format, options); // strip off the mutex transport. - if( transport instanceof MutexTransport ) { - transport = ((MutexTransport)transport).getNext(); + if (transport instanceof MutexTransport) { + transport = ((MutexTransport) transport).getNext(); } -// MutexTransport mutex = transport.narrow(MutexTransport.class); -// if (mutex != null) { -// mutex.setSyncOnCommand(true); -// } + return transport; } -// @Override -// protected Transport createInactivityMonitor(Transport transport, WireFormat format) { -// AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format); -// -// AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class); -// filter.setInactivityMonitor(monitor); -// -// return monitor; -// } - @Override protected boolean isUseInactivityMonitor(Transport transport) { return false; http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java index fe58cc6..0f0badb 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java @@ -16,22 +16,22 @@ */ package org.apache.activemq.transport.amqp; +import java.io.IOException; +import java.security.cert.X509Certificate; +import java.util.concurrent.locks.ReentrantLock; + import org.apache.activemq.broker.BrokerContext; import org.apache.activemq.command.Command; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFilter; import org.apache.activemq.transport.TransportListener; -import org.apache.qpid.proton.jms.InboundTransformer; import org.apache.activemq.transport.tcp.SslTransport; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.wireformat.WireFormat; +import org.apache.qpid.proton.jms.InboundTransformer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.security.cert.X509Certificate; -import java.util.concurrent.locks.ReentrantLock; - /** * The AMQPTransportFilter normally sits on top of a TcpTransport that has been * configured with the AmqpWireFormat and is used to convert AMQP commands to @@ -43,12 +43,11 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor static final Logger TRACE_BYTES = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".BYTES"); static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".FRAMES"); private IAmqpProtocolConverter protocolConverter; -// private AmqpInactivityMonitor monitor; private AmqpWireFormat wireFormat; private boolean trace; private String transformer = InboundTransformer.TRANSFORMER_NATIVE; - private ReentrantLock lock = new ReentrantLock(); + private final ReentrantLock lock = new ReentrantLock(); public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) { super(next); @@ -58,6 +57,7 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor } } + @Override public void oneway(Object o) throws IOException { try { final Command command = (Command) o; @@ -82,10 +82,12 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor } } + @Override public void sendToActiveMQ(IOException error) { super.onException(error); } + @Override public void onCommand(Object command) { try { if (trace) { @@ -104,6 +106,7 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor } } + @Override public void sendToActiveMQ(Command command) { assert lock.isHeldByCurrentThread(); TransportListener l = transportListener; @@ -112,6 +115,7 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor } } + @Override public void sendToAmqp(Object command) throws IOException { assert lock.isHeldByCurrentThread(); if (trace) { @@ -123,6 +127,7 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor } } + @Override public X509Certificate[] getPeerCertificates() { if (next instanceof SslTransport) { X509Certificate[] peerCerts = ((SslTransport) next).getPeerCertificates(); @@ -134,6 +139,7 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor return null; } + @Override public boolean isTrace() { return trace; } @@ -143,15 +149,6 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor this.protocolConverter.updateTracer(); } -// @Override -// public AmqpInactivityMonitor getInactivityMonitor() { -// return monitor; -// } -// -// public void setInactivityMonitor(AmqpInactivityMonitor monitor) { -// this.monitor = monitor; -// } - @Override public AmqpWireFormat getWireFormat() { return this.wireFormat; @@ -161,6 +158,7 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor super.onException(e); } + @Override public String getTransformer() { return transformer; } @@ -168,10 +166,13 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor public void setTransformer(String transformer) { this.transformer = transformer; } + + @Override public IAmqpProtocolConverter getProtocolConverter() { return protocolConverter; } + @Override public void setProtocolConverter(IAmqpProtocolConverter protocolConverter) { this.protocolConverter = protocolConverter; } http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java index 11d40fc..75856da 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java @@ -23,6 +23,8 @@ import org.apache.activemq.wireformat.WireFormatFactory; * Creates WireFormat objects that marshalls the <a href="http://stomp.codehaus.org/">Stomp</a> protocol. */ public class AmqpWireFormatFactory implements WireFormatFactory { + + @Override public WireFormat createWireFormat() { return new AmqpWireFormat(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java index 0693a70..392ed77 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java @@ -16,10 +16,9 @@ */ package org.apache.activemq.transport.amqp; -import org.apache.activemq.command.Response; - import java.io.IOException; +import org.apache.activemq.command.Response; /** * Interface used by the AMQPProtocolConverter for callbacks.
