https://issues.apache.org/jira/browse/AMQ-5602
Functional client with added tests to start to cover various expectations of an AMQP broker and some tests for expectations of a JMS mapping compliant broker. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/72839b78 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/72839b78 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/72839b78 Branch: refs/heads/master Commit: 72839b78a727bdec96dbd0a824a9f39a745b4d87 Parents: 10c47d6 Author: Timothy Bish <[email protected]> Authored: Fri Mar 13 15:47:30 2015 -0400 Committer: Timothy Bish <[email protected]> Committed: Fri Mar 13 15:47:30 2015 -0400 ---------------------------------------------------------------------- .../transport/amqp/AmqpProtocolConverter.java | 92 +-- .../activemq/transport/amqp/AmqpSupport.java | 130 +++- .../transport/amqp/AmqpTestSupport.java | 48 +- .../amqp/client/AmqpAbstractResource.java | 314 ++++++++++ .../transport/amqp/client/AmqpClient.java | 240 ++++++++ .../amqp/client/AmqpClientListener.java | 32 + .../amqp/client/AmqpClientTestSupport.java | 77 +++ .../transport/amqp/client/AmqpConnection.java | 532 ++++++++++++++++ .../amqp/client/AmqpDefaultClientListener.java | 28 + .../amqp/client/AmqpJmsSelectorType.java | 47 ++ .../transport/amqp/client/AmqpMessage.java | 179 ++++++ .../transport/amqp/client/AmqpNoLocalType.java | 44 ++ .../transport/amqp/client/AmqpReceiver.java | 599 +++++++++++++++++++ .../transport/amqp/client/AmqpResource.java | 163 +++++ .../transport/amqp/client/AmqpSender.java | 382 ++++++++++++ .../transport/amqp/client/AmqpSession.java | 168 ++++++ .../amqp/client/AmqpStateInspector.java | 88 +++ .../amqp/client/AmqpTransferTagGenerator.java | 103 ++++ .../amqp/client/sasl/AbstractMechanism.java | 80 +++ .../amqp/client/sasl/AnonymousMechanism.java | 43 ++ .../amqp/client/sasl/CramMD5Mechanism.java | 86 +++ .../transport/amqp/client/sasl/Mechanism.java | 125 ++++ .../amqp/client/sasl/PlainMechanism.java | 62 ++ .../amqp/client/sasl/SaslAuthenticator.java | 163 +++++ .../transport/amqp/client/util/AsyncResult.java | 47 ++ .../amqp/client/util/ClientFuture.java | 102 ++++ .../amqp/client/util/ClientTcpTransport.java | 384 ++++++++++++ .../client/util/UnmodifiableConnection.java | 179 ++++++ .../amqp/client/util/UnmodifiableDelivery.java | 147 +++++ .../amqp/client/util/UnmodifiableLink.java | 248 ++++++++ .../amqp/client/util/UnmodifiableReceiver.java | 59 ++ .../amqp/client/util/UnmodifiableSender.java | 45 ++ .../amqp/client/util/UnmodifiableSession.java | 134 +++++ .../amqp/client/util/WrappedAsyncResult.java | 59 ++ .../amqp/interop/AmqpConnectionsTest.java | 170 ++++++ .../amqp/interop/AmqpReceiverTest.java | 285 +++++++++ .../transport/amqp/interop/AmqpSenderTest.java | 94 +++ .../transport/amqp/interop/AmqpSessionTest.java | 40 ++ 38 files changed, 5730 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index b9b2ff2..39c8c2b 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -16,6 +16,20 @@ */ package org.apache.activemq.transport.amqp; +import static org.apache.activemq.transport.amqp.AmqpSupport.ANONYMOUS_RELAY; +import static org.apache.activemq.transport.amqp.AmqpSupport.CONNECTION_OPEN_FAILED; +import static org.apache.activemq.transport.amqp.AmqpSupport.COPY; +import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS; +import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS; +import static org.apache.activemq.transport.amqp.AmqpSupport.QUEUE_PREFIX; +import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_QUEUE_CAPABILITY; +import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_TOPIC_CAPABILITY; +import static org.apache.activemq.transport.amqp.AmqpSupport.TOPIC_PREFIX; +import static org.apache.activemq.transport.amqp.AmqpSupport.contains; +import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter; +import static org.apache.activemq.transport.amqp.AmqpSupport.toBytes; +import static org.apache.activemq.transport.amqp.AmqpSupport.toLong; + import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; @@ -90,7 +104,6 @@ import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.UnsignedLong; import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.Modified; @@ -136,19 +149,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { private static final Logger LOG = LoggerFactory.getLogger(AmqpProtocolConverter.class); private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {}; private static final int CHANNEL_MAX = 32767; - private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY"); - private static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix"); - private static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix"); - private static final Symbol COPY = Symbol.getSymbol("copy"); - private static final UnsignedLong JMS_SELECTOR_CODE = UnsignedLong.valueOf(0x0000468C00000004L); - private static final Symbol JMS_SELECTOR_NAME = Symbol.valueOf("apache.org:selector-filter:string"); - private static final Object[] JMS_SELECTOR_FILTER_IDS = new Object[] { JMS_SELECTOR_CODE, JMS_SELECTOR_NAME }; - private static final UnsignedLong NO_LOCAL_CODE = UnsignedLong.valueOf(0x0000468C00000003L); - private static final Symbol NO_LOCAL_NAME = Symbol.valueOf("apache.org:selector-filter:string"); - private static final Object[] NO_LOCAL_FILTER_IDS = new Object[] { NO_LOCAL_CODE, NO_LOCAL_NAME }; - private static final Symbol TEMP_QUEUE_CAPABILITY = Symbol.valueOf("temporary-queue"); - private static final Symbol TEMP_TOPIC_CAPABILITY = Symbol.valueOf("temporary-topic"); - private static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed"); private final AmqpTransport amqpTransport; private final AmqpWireFormat amqpWireFormat; @@ -874,17 +874,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { private final AtomicLong nextTransactionId = new AtomicLong(); - public byte[] toBytes(long value) { - Buffer buffer = new Buffer(8); - buffer.bigEndianEditor().writeLong(value); - return buffer.data; - } - - private long toLong(Binary value) { - Buffer buffer = new Buffer(value.getArray(), value.getArrayOffset(), value.getLength()); - return buffer.bigEndianEditor().readLong(); - } - AmqpDeliveryListener coordinatorContext = new BaseProducerContext() { @Override @@ -946,7 +935,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { if (response.isException()) { ExceptionResponse er = (ExceptionResponse) response; Rejected rejected = new Rejected(); - rejected.setError(createErrorCondition("failed", er.getException().getMessage())); + rejected.setError(new ErrorCondition(Symbol.valueOf("failed"), er.getException().getMessage())); delivery.disposition(rejected); } else { delivery.disposition(Accepted.getInstance()); @@ -1639,46 +1628,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { }); } - private boolean contains(Symbol[] symbols, Symbol key) { - if (symbols == null || symbols.length == 0) { - return false; - } - - for (Symbol symbol : symbols) { - if (symbol.equals(key)) { - return true; - } - } - - return false; - } - - private DescribedType findFilter(Map<Symbol, Object> filters, Object[] filterIds) { - - if (filterIds == null || filterIds.length == 0) { - throw new IllegalArgumentException("Invalid Filter Ids array passed: " + filterIds); - } - - if (filters == null || filters.isEmpty()) { - return null; - } - - for (Object value : filters.values()) { - if (value instanceof DescribedType) { - DescribedType describedType = ((DescribedType) value); - Object descriptor = describedType.getDescriptor(); - - for (Object filterId : filterIds) { - if (descriptor.equals(filterId)) { - return describedType; - } - } - } - } - - return null; - } - // ////////////////////////////////////////////////////////////////////////// // // Implementation methods @@ -1707,17 +1656,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { } } - ErrorCondition createErrorCondition(String name) { - return createErrorCondition(name, ""); - } - - ErrorCondition createErrorCondition(String name, String description) { - ErrorCondition condition = new ErrorCondition(); - condition.setCondition(Symbol.valueOf(name)); - condition.setDescription(description); - return condition; - } - @Override public void setPrefetch(int prefetch) { this.prefetch = prefetch; http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java index 9a01f7b..c0cfb94 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java @@ -17,19 +17,116 @@ package org.apache.activemq.transport.amqp; import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.DescribedType; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.UnsignedLong; import org.fusesource.hawtbuf.Buffer; /** - * + * Set of useful methods and definitions used in the AMQP protocol handling */ public class AmqpSupport { - static public Buffer toBuffer(ByteBuffer data) { + // Identification values used to locating JMS selector types. + public static final UnsignedLong JMS_SELECTOR_CODE = UnsignedLong.valueOf(0x0000468C00000004L); + public static final Symbol JMS_SELECTOR_NAME = Symbol.valueOf("apache.org:selector-filter:string"); + public static final Object[] JMS_SELECTOR_FILTER_IDS = new Object[] { JMS_SELECTOR_CODE, JMS_SELECTOR_NAME }; + public static final UnsignedLong NO_LOCAL_CODE = UnsignedLong.valueOf(0x0000468C00000003L); + public static final Symbol NO_LOCAL_NAME = Symbol.valueOf("apache.org:selector-filter:string"); + public static final Object[] NO_LOCAL_FILTER_IDS = new Object[] { NO_LOCAL_CODE, NO_LOCAL_NAME }; + + // Capabilities used to identify destination type in some requests. + public static final Symbol TEMP_QUEUE_CAPABILITY = Symbol.valueOf("temporary-queue"); + public static final Symbol TEMP_TOPIC_CAPABILITY = Symbol.valueOf("temporary-topic"); + + // Symbols used to announce connection information to remote peer. + public static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY"); + public static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix"); + public static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix"); + public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed"); + + // Symbols used in configuration of newly opened links. + public static final Symbol COPY = Symbol.getSymbol("copy"); + + /** + * Search for a given Symbol in a given array of Symbol object. + * + * @param symbols + * the set of Symbols to search. + * @param key + * the value to try and find in the Symbol array. + * + * @return true if the key is found in the given Symbol array. + */ + public static boolean contains(Symbol[] symbols, Symbol key) { + if (symbols == null || symbols.length == 0) { + return false; + } + + for (Symbol symbol : symbols) { + if (symbol.equals(key)) { + return true; + } + } + + return false; + } + + /** + * Search for a particular filter using a set of known indentification values + * in the Map of filters. + * + * @param filters + * The filters map that should be searched. + * @param filterIds + * The aliases for the target filter to be located. + * + * @return the filter if found in the mapping or null if not found. + */ + public static DescribedType findFilter(Map<Symbol, Object> filters, Object[] filterIds) { + + if (filterIds == null || filterIds.length == 0) { + throw new IllegalArgumentException("Invalid Filter Ids array passed: " + filterIds); + } + + if (filters == null || filters.isEmpty()) { + return null; + } + + for (Object value : filters.values()) { + if (value instanceof DescribedType) { + DescribedType describedType = ((DescribedType) value); + Object descriptor = describedType.getDescriptor(); + + for (Object filterId : filterIds) { + if (descriptor.equals(filterId)) { + return describedType; + } + } + } + } + + return null; + } + + /** + * Conversion from Java ByteBuffer to a HawtBuf buffer. + * + * @param data + * the ByteBuffer instance to convert. + * + * @return a new HawtBuf buffer converted from the given ByteBuffer. + */ + public static Buffer toBuffer(ByteBuffer data) { if (data == null) { return null; } + Buffer rc; + if (data.isDirect()) { rc = new Buffer(data.remaining()); data.get(rc.data); @@ -37,6 +134,35 @@ public class AmqpSupport { rc = new Buffer(data); data.position(data.position() + data.remaining()); } + return rc; } + + /** + * Given a long value, convert it to a byte array for marshalling. + * + * @param value + * the value to convert. + * + * @return a new byte array that holds the big endian value of the long. + */ + public static byte[] toBytes(long value) { + Buffer buffer = new Buffer(8); + buffer.bigEndianEditor().writeLong(value); + return buffer.data; + } + + /** + * Converts a Binary value to a long assuming that the contained value is + * stored in Big Endian encoding. + * + * @param value + * the Binary object whose payload is converted to a long. + * + * @return a long value constructed from the bytes of the Binary instance. + */ + public static long toLong(Binary value) { + Buffer buffer = new Buffer(value.getArray(), value.getArrayOffset(), value.getLength()); + return buffer.bigEndianEditor().readLong(); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java index e20168c..33ae799 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java @@ -36,6 +36,7 @@ import javax.net.ssl.KeyManager; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; +import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.jmx.BrokerViewMBean; @@ -242,18 +243,47 @@ public class AmqpTestSupport { LOG.info("========== tearDown " + getTestName() + " =========="); } - public void sendMessages(Connection connection, Destination destination, int count) throws Exception { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer p = session.createProducer(destination); + public Connection createJMSConnection() throws JMSException { + if (!isUseOpenWireConnector()) { + throw new javax.jms.IllegalStateException("OpenWire TransportConnector was not configured."); + } + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(openwireURI); + + return factory.createConnection(); + } - for (int i = 1; i <= count; i++) { - TextMessage message = session.createTextMessage(); - message.setText("TextMessage: " + i); - message.setIntProperty(MESSAGE_NUMBER, i); - p.send(message); + public void sendMessages(String destinationName, int count, boolean topic) throws Exception { + Connection connection = createJMSConnection(); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = null; + if (topic) { + destination = session.createTopic(destinationName); + } else { + destination = session.createQueue(destinationName); + } + + sendMessages(connection, destination, count); + } finally { + connection.close(); } + } - session.close(); + public void sendMessages(Connection connection, Destination destination, int count) throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + try { + MessageProducer p = session.createProducer(destination); + + for (int i = 1; i <= count; i++) { + TextMessage message = session.createTextMessage(); + message.setText("TextMessage: " + i); + message.setIntProperty(MESSAGE_NUMBER, i); + p.send(message); + } + } finally { + session.close(); + } } public String getTestName() { http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java new file mode 100644 index 0000000..8a5a587 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java @@ -0,0 +1,314 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import java.io.IOException; + +import org.apache.activemq.transport.amqp.client.util.AsyncResult; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.transport.AmqpError; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.engine.Endpoint; +import org.apache.qpid.proton.engine.EndpointState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Abstract base for all AmqpResource implementations to extend. + * + * This abstract class wraps up the basic state management bits so that the concrete + * object don't have to reproduce it. Provides hooks for the subclasses to initialize + * and shutdown. + */ +public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpResource { + + private static final Logger LOG = LoggerFactory.getLogger(AmqpAbstractResource.class); + + protected AsyncResult openRequest; + protected AsyncResult closeRequest; + + private AmqpStateInspector amqpStateInspector = new AmqpStateInspector(); + + private E endpoint; + + @Override + public void open(AsyncResult request) { + this.openRequest = request; + doOpen(); + getEndpoint().setContext(this); + } + + @Override + public boolean isOpen() { + return getEndpoint().getRemoteState() == EndpointState.ACTIVE; + } + + @Override + public void opened() { + if (this.openRequest != null) { + this.openRequest.onSuccess(); + this.openRequest = null; + } + } + + @Override + public void close(AsyncResult request) { + // If already closed signal success or else the caller might never get notified. + if (getEndpoint().getLocalState() == EndpointState.CLOSED || + getEndpoint().getRemoteState() == EndpointState.CLOSED) { + + if (getEndpoint().getLocalState() != EndpointState.CLOSED) { + // Remote already closed this resource, close locally and free. + if (getEndpoint().getLocalState() != EndpointState.CLOSED) { + doClose(); + getEndpoint().free(); + } + } + + request.onSuccess(); + return; + } + + this.closeRequest = request; + doClose(); + } + + @Override + public boolean isClosed() { + return getEndpoint().getLocalState() == EndpointState.CLOSED; + } + + @Override + public void closed() { + getEndpoint().close(); + getEndpoint().free(); + + if (this.closeRequest != null) { + this.closeRequest.onSuccess(); + this.closeRequest = null; + } + } + + @Override + public void failed() { + failed(new Exception("Remote request failed.")); + } + + @Override + public void failed(Exception cause) { + if (openRequest != null) { + if (endpoint != null) { + // TODO: if this is a producer/consumer link then we may only be detached, + // rather than fully closed, and should respond appropriately. + endpoint.close(); + } + openRequest.onFailure(cause); + openRequest = null; + } + + if (closeRequest != null) { + closeRequest.onFailure(cause); + closeRequest = null; + } + } + + @Override + public void remotelyClosed(AmqpConnection connection) { + Exception error = getRemoteError(); + if (error == null) { + error = new IOException("Remote has closed without error information"); + } + + if (endpoint != null) { + // TODO: if this is a producer/consumer link then we may only be detached, + // rather than fully closed, and should respond appropriately. + endpoint.close(); + } + + LOG.info("Resource {} was remotely closed", this); + + connection.fireClientException(error); + } + + public E getEndpoint() { + return this.endpoint; + } + + public void setEndpoint(E endpoint) { + this.endpoint = endpoint; + } + + public AmqpStateInspector getStateInspector() { + return amqpStateInspector; + } + + public void setStateInspector(AmqpStateInspector stateInspector) { + if (stateInspector == null) { + stateInspector = new AmqpStateInspector(); + } + + this.amqpStateInspector = stateInspector; + } + + public EndpointState getLocalState() { + if (getEndpoint() == null) { + return EndpointState.UNINITIALIZED; + } + return getEndpoint().getLocalState(); + } + + public EndpointState getRemoteState() { + if (getEndpoint() == null) { + return EndpointState.UNINITIALIZED; + } + return getEndpoint().getRemoteState(); + } + + @Override + public boolean hasRemoteError() { + return getEndpoint().getRemoteCondition().getCondition() != null; + } + + @Override + public Exception getRemoteError() { + String message = getRemoteErrorMessage(); + Exception remoteError = null; + Symbol error = getEndpoint().getRemoteCondition().getCondition(); + if (error != null) { + if (error.equals(AmqpError.UNAUTHORIZED_ACCESS)) { + remoteError = new SecurityException(message); + } else { + remoteError = new Exception(message); + } + } + + return remoteError; + } + + @Override + public String getRemoteErrorMessage() { + String message = "Received unkown error from remote peer"; + if (getEndpoint().getRemoteCondition() != null) { + ErrorCondition error = getEndpoint().getRemoteCondition(); + if (error.getDescription() != null && !error.getDescription().isEmpty()) { + message = error.getDescription(); + } + } + + return message; + } + + @Override + public void processRemoteOpen(AmqpConnection connection) throws IOException { + doOpenInspection(); + doOpenCompletion(); + } + + @Override + public void processRemoteDetach(AmqpConnection connection) throws IOException { + doDetachedInspection(); + if (isAwaitingClose()) { + LOG.debug("{} is now closed: ", this); + closed(); + } else { + remotelyClosed(connection); + } + } + + @Override + public void processRemoteClose(AmqpConnection connection) throws IOException { + doClosedInspection(); + if (isAwaitingClose()) { + LOG.debug("{} is now closed: ", this); + closed(); + } else if (isAwaitingOpen()) { + // Error on Open, create exception and signal failure. + LOG.warn("Open of {} failed: ", this); + Exception openError; + if (hasRemoteError()) { + openError = getRemoteError(); + } else { + openError = getOpenAbortException(); + } + + failed(openError); + } else { + remotelyClosed(connection); + } + } + + @Override + public void processDeliveryUpdates(AmqpConnection connection) throws IOException { + } + + @Override + public void processFlowUpdates(AmqpConnection connection) throws IOException { + } + + /** + * Perform the open operation on the managed endpoint. A subclass may + * override this method to provide additional open actions or configuration + * updates. + */ + protected void doOpen() { + getEndpoint().open(); + } + + /** + * Perform the close operation on the managed endpoint. A subclass may + * override this method to provide additional close actions or alter the + * standard close path such as endpoint detach etc. + */ + protected void doClose() { + getEndpoint().close(); + } + + /** + * Complete the open operation on the managed endpoint. A subclass may + * override this method to provide additional verification actions or configuration + * updates. + */ + protected void doOpenCompletion() { + LOG.debug("{} is now open: ", this); + opened(); + } + + /** + * When aborting the open operation, and there isnt an error condition, + * provided by the peer, the returned exception will be used instead. + * A subclass may override this method to provide alternative behaviour. + */ + protected Exception getOpenAbortException() { + return new IOException("Open failed unexpectedly."); + } + + // TODO - Fina a more generic way to do this. + protected abstract void doOpenInspection(); + protected abstract void doClosedInspection(); + + protected void doDetachedInspection() {} + + //----- Private implementation utility methods ---------------------------// + + private boolean isAwaitingOpen() { + return this.openRequest != null; + } + + private boolean isAwaitingClose() { + return this.closeRequest != null; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java new file mode 100644 index 0000000..0b299e4 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import java.net.URI; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.activemq.transport.amqp.client.util.ClientTcpTransport; +import org.apache.qpid.proton.amqp.Symbol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Connection instance used to connect to the Broker using Proton as + * the AMQP protocol handler. + */ +public class AmqpClient { + + private static final Logger LOG = LoggerFactory.getLogger(AmqpClient.class); + + private final String username; + private final String password; + private final URI remoteURI; + + private AmqpStateInspector stateInspector = new AmqpStateInspector(); + private List<Symbol> offeredCapabilities = Collections.emptyList(); + private Map<Symbol, Object> offeredProperties = Collections.emptyMap(); + + /** + * Creates an AmqpClient instance which can be used as a factory for connections. + * + * @param remoteURI + * The address of the remote peer to connect to. + * @param username + * The user name to use when authenticating the client. + * @param password + * The password to use when authenticating the client. + */ + public AmqpClient(URI remoteURI, String username, String password) { + this.remoteURI = remoteURI; + this.password = password; + this.username = username; + } + + /** + * Creates a connection with the broker at the given location, this method initiates a + * connect attempt immediately and will fail if the remote peer cannot be reached. + * + * @returns a new connection object used to interact with the connected peer. + * + * @throws Exception if an error occurs attempting to connect to the Broker. + */ + public AmqpConnection connect() throws Exception { + + AmqpConnection connection = createConnection(); + + LOG.debug("Attempting to create new connection to peer: {}", remoteURI); + connection.connect(); + + return connection; + } + + /** + * Creates a connection object using the configured values for user, password, remote URI + * etc. This method does not immediately initiate a connection to the remote leaving that + * to the caller which provides a connection object that can have additional configuration + * changes applied before the <code>connect</code> method is invoked. + * + * @returns a new connection object used to interact with the connected peer. + * + * @throws Exception if an error occurs attempting to connect to the Broker. + */ + public AmqpConnection createConnection() throws Exception { + if (username == null && password != null) { + throw new IllegalArgumentException("Password must be null if user name value is null"); + } + + ClientTcpTransport transport = null; + + if (remoteURI.getScheme().equals("tcp")) { + transport = new ClientTcpTransport(remoteURI); + } else { + throw new IllegalArgumentException("Client only support TCP currently."); + } + + AmqpConnection connection = new AmqpConnection(transport, username, password); + + connection.setOfferedCapabilities(getOfferedCapabilities()); + connection.setOfferedProperties(getOfferedProperties()); + connection.setStateInspector(getStateInspector()); + + return connection; + } + + /** + * @return the user name value given when connect was called, always null before connect. + */ + public String getUsername() { + return username; + } + + /** + * @return the password value given when connect was called, always null before connect. + */ + public String getPassword() { + return password; + } + + /** + * @return the currently set address to use to connect to the AMQP peer. + */ + public URI getRemoteURI() { + return remoteURI; + } + + /** + * Sets the offered capabilities that should be used when a new connection attempt + * is made. + * + * @param offeredCapabilities + * the list of capabilities to offer when connecting. + */ + public void setOfferedCapabilities(List<Symbol> offeredCapabilities) { + if (offeredCapabilities != null) { + offeredCapabilities = Collections.emptyList(); + } + + this.offeredCapabilities = offeredCapabilities; + } + + /** + * @return an unmodifiable view of the currently set offered capabilities + */ + public List<Symbol> getOfferedCapabilities() { + return Collections.unmodifiableList(offeredCapabilities); + } + + /** + * Sets the offered connection properties that should be used when a new connection + * attempt is made. + * + * @param connectionProperties + * the map of properties to offer when connecting. + */ + public void setOfferedProperties(Map<Symbol, Object> offeredProperties) { + if (offeredProperties != null) { + offeredProperties = Collections.emptyMap(); + } + + this.offeredProperties = offeredProperties; + } + + /** + * @return an unmodifiable view of the currently set connection properties. + */ + public Map<Symbol, Object> getOfferedProperties() { + return Collections.unmodifiableMap(offeredProperties); + } + + /** + * @return the currently set state inspector used to check state after various events. + */ + public AmqpStateInspector getStateInspector() { + return stateInspector; + } + + /** + * Sets the state inspector used to check that the AMQP resource is valid after + * specific lifecycle events such as open and close. + * + * @param stateInspector + * the new state inspector to use. + */ + public void setStateInspector(AmqpStateInspector stateInspector) { + if (stateInspector == null) { + stateInspector = new AmqpStateInspector(); + } + + this.stateInspector = stateInspector; + } + + @Override + public String toString() { + return "AmqpClient: " + getRemoteURI().getHost() + ":" + getRemoteURI().getPort(); + } + + /** + * Creates an anonymous connection with the broker at the given location. + * + * @param broker + * the address of the remote broker instance. + * + * @returns a new connection object used to interact with the connected peer. + * + * @throws Exception if an error occurs attempting to connect to the Broker. + */ + public static AmqpConnection connect(URI broker) throws Exception { + return connect(broker, null, null); + } + + /** + * Creates a connection with the broker at the given location. + * + * @param broker + * the address of the remote broker instance. + * @param username + * the user name to use to connect to the broker or null for anonymous. + * @param password + * the password to use to connect to the broker, must be null if user name is null. + * + * @returns a new connection object used to interact with the connected peer. + * + * @throws Exception if an error occurs attempting to connect to the Broker. + */ + public static AmqpConnection connect(URI broker, String username, String password) throws Exception { + if (username == null && password != null) { + throw new IllegalArgumentException("Password must be null if user name value is null"); + } + + AmqpClient client = new AmqpClient(broker, username, password); + + return client.connect(); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientListener.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientListener.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientListener.java new file mode 100644 index 0000000..3df7cf4 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientListener.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +/** + * Events points exposed by the AmqpClient object. + */ +public interface AmqpClientListener { + + /** + * Indicates some error has occurred during client operations. + * + * @param ex + * The error that triggered this event. + */ + void onClientException(Throwable ex); + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java new file mode 100644 index 0000000..4d3f571 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import java.net.URI; + +import org.apache.activemq.transport.amqp.AmqpTestSupport; + +/** + * Test support class for tests that will be using the AMQP Proton wrapper client. + */ +public class AmqpClientTestSupport extends AmqpTestSupport { + + public String getAmqpConnectionURIOptions() { + return ""; + } + + public URI getBrokerAmqpConnectionURI() { + try { + String uri = "tcp://127.0.0.1:" + amqpPort; + + if (!getAmqpConnectionURIOptions().isEmpty()) { + uri = uri + "?" + getAmqpConnectionURIOptions(); + } + + return new URI(uri); + } catch (Exception e) { + throw new RuntimeException(); + } + } + + public AmqpConnection createAmqpConnection() throws Exception { + return createAmqpConnection(getBrokerAmqpConnectionURI()); + } + + public AmqpConnection createAmqpConnection(String username, String password) throws Exception { + return createAmqpConnection(getBrokerAmqpConnectionURI(), username, password); + } + + public AmqpConnection createAmqpConnection(URI brokerURI) throws Exception { + return createAmqpConnection(brokerURI, null, null); + } + + public AmqpConnection createAmqpConnection(URI brokerURI, String username, String password) throws Exception { + return createAmqpClient(brokerURI, username, password).connect(); + } + + public AmqpClient createAmqpClient() throws Exception { + return createAmqpClient(getBrokerAmqpConnectionURI(), null, null); + } + + public AmqpClient createAmqpClient(URI brokerURI) throws Exception { + return createAmqpClient(brokerURI, null, null); + } + + public AmqpClient createAmqpClient(String username, String password) throws Exception { + return createAmqpClient(getBrokerAmqpConnectionURI(), username, password); + } + + public AmqpClient createAmqpClient(URI brokerURI, String username, String password) throws Exception { + return new AmqpClient(brokerURI, username, password); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java new file mode 100644 index 0000000..a98f711 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java @@ -0,0 +1,532 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import static org.apache.activemq.transport.amqp.AmqpSupport.CONNECTION_OPEN_FAILED; + +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.activemq.transport.amqp.client.sasl.SaslAuthenticator; +import org.apache.activemq.transport.amqp.client.util.ClientFuture; +import org.apache.activemq.transport.amqp.client.util.ClientTcpTransport; +import org.apache.activemq.transport.amqp.client.util.UnmodifiableConnection; +import org.apache.activemq.util.IdGenerator; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.engine.Collector; +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Event.Type; +import org.apache.qpid.proton.engine.Sasl; +import org.apache.qpid.proton.engine.Transport; +import org.apache.qpid.proton.engine.impl.CollectorImpl; +import org.fusesource.hawtbuf.Buffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AmqpConnection extends AmqpAbstractResource<Connection> implements ClientTcpTransport.TransportListener { + + private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class); + + private static final int DEFAULT_MAX_FRAME_SIZE = 1024 * 1024 * 1; + // NOTE: Limit default channel max to signed short range to deal with + // brokers that don't currently handle the unsigned range well. + private static final int DEFAULT_CHANNEL_MAX = 32767; + private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); + + public static final long DEFAULT_CONNECT_TIMEOUT = 15000; + public static final long DEFAULT_CLOSE_TIMEOUT = 30000; + + private final ScheduledExecutorService serializer; + private final AtomicBoolean closed = new AtomicBoolean(); + private final AtomicBoolean connected = new AtomicBoolean(); + private final AtomicLong sessionIdGenerator = new AtomicLong(); + private final Collector protonCollector = new CollectorImpl(); + private final ClientTcpTransport transport; + private final Transport protonTransport = Transport.Factory.create(); + + private final String username; + private final String password; + private final URI remoteURI; + private final String connectionId; + private List<Symbol> offeredCapabilities = Collections.emptyList(); + private Map<Symbol, Object> offeredProperties = Collections.emptyMap(); + + private AmqpClientListener listener; + private SaslAuthenticator authenticator; + + private String containerId; + private boolean authenticated; + private int channelMax = DEFAULT_CHANNEL_MAX; + private long connectTimeout = DEFAULT_CONNECT_TIMEOUT; + private long closeTimeout = DEFAULT_CLOSE_TIMEOUT; + + public AmqpConnection(ClientTcpTransport transport, String username, String password) { + setEndpoint(Connection.Factory.create()); + getEndpoint().collect(protonCollector); + + this.transport = transport; + this.username = username; + this.password = password; + this.connectionId = CONNECTION_ID_GENERATOR.generateId(); + this.remoteURI = transport.getRemoteURI(); + + this.serializer = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + + @Override + public Thread newThread(Runnable runner) { + Thread serial = new Thread(runner); + serial.setDaemon(true); + serial.setName(toString()); + return serial; + } + }); + + this.transport.setTransportListener(this); + } + + public void connect() throws Exception { + if (connected.compareAndSet(false, true)) { + transport.connect(); + + final ClientFuture future = new ClientFuture(); + serializer.execute(new Runnable() { + @Override + public void run() { + getEndpoint().setContainer(safeGetContainerId()); + getEndpoint().setHostname(remoteURI.getHost()); + if (!getOfferedCapabilities().isEmpty()) { + getEndpoint().setOfferedCapabilities(getOfferedCapabilities().toArray(new Symbol[0])); + } + if (!getOfferedProperties().isEmpty()) { + getEndpoint().setProperties(getOfferedProperties()); + } + + protonTransport.setMaxFrameSize(getMaxFrameSize()); + protonTransport.setChannelMax(getChannelMax()); + protonTransport.bind(getEndpoint()); + Sasl sasl = protonTransport.sasl(); + if (sasl != null) { + sasl.client(); + } + authenticator = new SaslAuthenticator(sasl, username, password); + open(future); + + pumpToProtonTransport(); + } + }); + + if (connectTimeout <= 0) { + future.sync(); + } else { + future.sync(connectTimeout, TimeUnit.MILLISECONDS); + } + } + } + + public boolean isConnected() { + return transport.isConnected() && connected.get(); + } + + public void close() { + if (closed.compareAndSet(false, true)) { + final ClientFuture request = new ClientFuture(); + serializer.execute(new Runnable() { + + @Override + public void run() { + try { + + // If we are not connected then there is nothing we can do now + // just signal success. + if (!transport.isConnected()) { + request.onSuccess(); + } + + if (getEndpoint() != null) { + close(request); + } else { + request.onSuccess(); + } + + pumpToProtonTransport(); + } catch (Exception e) { + LOG.debug("Caught exception while closing proton connection"); + } + } + }); + + try { + if (closeTimeout <= 0) { + request.sync(); + } else { + request.sync(closeTimeout, TimeUnit.MILLISECONDS); + } + } catch (IOException e) { + LOG.warn("Error caught while closing Provider: ", e.getMessage()); + } finally { + if (transport != null) { + try { + transport.close(); + } catch (Exception e) { + LOG.debug("Cuaght exception while closing down Transport: {}", e.getMessage()); + } + } + + serializer.shutdown(); + } + } + } + + /** + * Creates a new Session instance used to create AMQP resources like + * senders and receivers. + * + * @return a new AmqpSession that can be used to create links. + * + * @throws Exception if an error occurs during creation. + */ + public AmqpSession createSession() throws Exception { + checkClosed(); + + final AmqpSession session = new AmqpSession(AmqpConnection.this, getNextSessionId()); + final ClientFuture request = new ClientFuture(); + + serializer.execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + + session.setEndpoint(getEndpoint().session()); + session.open(request); + + pumpToProtonTransport(); + } + }); + + request.sync(); + + return session; + } + + //----- Configuration accessors ------------------------------------------// + + /** + * @return the user name that was used to authenticate this connection. + */ + public String getUsername() { + return username; + } + + /** + * @return the password that was used to authenticate this connection. + */ + public String getPassword() { + return password; + } + + /** + * @return the URI of the remote peer this connection attached to. + */ + public URI getRemoteURI() { + return remoteURI; + } + + /** + * @return the container ID that will be set as the container Id. + */ + public String getContainerId() { + return this.containerId; + } + + /** + * Sets the container Id that will be configured on the connection prior to + * connecting to the remote peer. Calling this after connect has no effect. + * + * @param containerId + * the container Id to use on the connection. + */ + public void setContainerId(String containerId) { + this.containerId = containerId; + } + + /** + * @return the currently set Max Frame Size value. + */ + public int getMaxFrameSize() { + return DEFAULT_MAX_FRAME_SIZE; + } + + public int getChannelMax() { + return channelMax; + } + + public void setChannelMax(int channelMax) { + this.channelMax = channelMax; + } + + public long getConnectTimeout() { + return connectTimeout; + } + + public void setConnectTimeout(long connectTimeout) { + this.connectTimeout = connectTimeout; + } + + public long getCloseTimeout() { + return closeTimeout; + } + + public void setCloseTimeout(long closeTimeout) { + this.closeTimeout = closeTimeout; + } + + public List<Symbol> getOfferedCapabilities() { + return offeredCapabilities; + } + + public void setOfferedCapabilities(List<Symbol> offeredCapabilities) { + if (offeredCapabilities != null) { + offeredCapabilities = Collections.emptyList(); + } + + this.offeredCapabilities = offeredCapabilities; + } + + public Map<Symbol, Object> getOfferedProperties() { + return offeredProperties; + } + + public void setOfferedProperties(Map<Symbol, Object> offeredProperties) { + if (offeredProperties != null) { + offeredProperties = Collections.emptyMap(); + } + + this.offeredProperties = offeredProperties; + } + + public Connection getConnection() { + return new UnmodifiableConnection(getEndpoint()); + } + + //----- Internal getters used from the child AmqpResource classes --------// + + ScheduledExecutorService getScheduler() { + return this.serializer; + } + + Connection getProtonConnection() { + return getEndpoint(); + } + + void pumpToProtonTransport() { + try { + boolean done = false; + while (!done) { + ByteBuffer toWrite = protonTransport.getOutputBuffer(); + if (toWrite != null && toWrite.hasRemaining()) { + transport.send(toWrite); + protonTransport.outputConsumed(); + } else { + done = true; + } + } + } catch (IOException e) { + fireClientException(e); + } + } + + //----- Transport listener event hooks -----------------------------------// + + @Override + public void onData(final Buffer input) { + serializer.execute(new Runnable() { + + @Override + public void run() { + ByteBuffer source = input.toByteBuffer(); + LOG.trace("Received from Broker {} bytes:", source.remaining()); + + do { + ByteBuffer buffer = protonTransport.getInputBuffer(); + int limit = Math.min(buffer.remaining(), source.remaining()); + ByteBuffer duplicate = source.duplicate(); + duplicate.limit(source.position() + limit); + buffer.put(duplicate); + protonTransport.processInput(); + source.position(source.position() + limit); + } while (source.hasRemaining()); + + // Process the state changes from the latest data and then answer back + // any pending updates to the Broker. + processUpdates(); + pumpToProtonTransport(); + } + }); + } + + @Override + public void onTransportClosed() { + LOG.debug("The transport has unexpectedly closed"); + } + + @Override + public void onTransportError(Throwable cause) { + fireClientException(cause); + } + + //----- Internal implementation ------------------------------------------// + + @Override + protected void doOpenCompletion() { + // If the remote indicates that a close is pending, don't open. + if (!getEndpoint().getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) { + super.doOpenCompletion(); + } + } + + @Override + protected void doOpenInspection() { + getStateInspector().inspectOpenedResource(getConnection()); + } + + @Override + protected void doClosedInspection() { + getStateInspector().inspectClosedResource(getConnection()); + } + + protected void fireClientException(Throwable ex) { + AmqpClientListener listener = this.listener; + if (listener != null) { + listener.onClientException(ex); + } + } + + protected void checkClosed() throws IllegalStateException { + if (closed.get()) { + throw new IllegalStateException("The Connection is already closed"); + } + } + + private void processUpdates() { + try { + Event protonEvent = null; + while ((protonEvent = protonCollector.peek()) != null) { + if (!protonEvent.getType().equals(Type.TRANSPORT)) { + LOG.trace("New Proton Event: {}", protonEvent.getType()); + } + + AmqpResource amqpResource = null; + switch (protonEvent.getType()) { + case CONNECTION_REMOTE_CLOSE: + amqpResource = (AmqpConnection) protonEvent.getConnection().getContext(); + amqpResource.processRemoteClose(this); + break; + case CONNECTION_REMOTE_OPEN: + amqpResource = (AmqpConnection) protonEvent.getConnection().getContext(); + amqpResource.processRemoteOpen(this); + break; + case SESSION_REMOTE_CLOSE: + amqpResource = (AmqpSession) protonEvent.getSession().getContext(); + amqpResource.processRemoteClose(this); + break; + case SESSION_REMOTE_OPEN: + amqpResource = (AmqpSession) protonEvent.getSession().getContext(); + amqpResource.processRemoteOpen(this); + break; + case LINK_REMOTE_CLOSE: + amqpResource = (AmqpResource) protonEvent.getLink().getContext(); + amqpResource.processRemoteClose(this); + break; + case LINK_REMOTE_DETACH: + amqpResource = (AmqpResource) protonEvent.getLink().getContext(); + amqpResource.processRemoteDetach(this); + break; + case LINK_REMOTE_OPEN: + amqpResource = (AmqpResource) protonEvent.getLink().getContext(); + amqpResource.processRemoteOpen(this); + break; + case LINK_FLOW: + amqpResource = (AmqpResource) protonEvent.getLink().getContext(); + amqpResource.processFlowUpdates(this); + break; + case DELIVERY: + amqpResource = (AmqpResource) protonEvent.getLink().getContext(); + amqpResource.processDeliveryUpdates(this); + break; + default: + break; + } + + protonCollector.pop(); + } + + // We have to do this to pump SASL bytes in as SASL is not event driven yet. + if (!authenticated) { + processSaslAuthentication(); + } + } catch (Exception ex) { + LOG.warn("Caught Exception during update processing: {}", ex.getMessage(), ex); + fireClientException(ex); + } + } + + private void processSaslAuthentication() { + if (authenticated || authenticator == null) { + return; + } + + try { + if (authenticator.authenticate()) { + authenticator = null; + authenticated = true; + } + } catch (SecurityException ex) { + failed(ex); + } + } + + private String getNextSessionId() { + return connectionId + ":" + sessionIdGenerator.incrementAndGet(); + } + + private String safeGetContainerId() { + String containerId = getContainerId(); + if (containerId == null || containerId.isEmpty()) { + containerId = UUID.randomUUID().toString(); + } + + return containerId; + } + + @Override + public String toString() { + return "AmqpConnection { " + connectionId + " }"; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpDefaultClientListener.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpDefaultClientListener.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpDefaultClientListener.java new file mode 100644 index 0000000..9b2394c --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpDefaultClientListener.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +/** + * Default listener implementation that stubs out all the event methods. + */ +public class AmqpDefaultClientListener implements AmqpClientListener { + + @Override + public void onClientException(Throwable ex) { + + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorType.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorType.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorType.java new file mode 100644 index 0000000..d93e052 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorType.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import org.apache.qpid.proton.amqp.DescribedType; +import org.apache.qpid.proton.amqp.UnsignedLong; + +/** + * A Described Type wrapper for JMS selector values. + */ +public class AmqpJmsSelectorType implements DescribedType { + + private final String selector; + + public AmqpJmsSelectorType(String selector) { + this.selector = selector; + } + + @Override + public Object getDescriptor() { + return UnsignedLong.valueOf(0x0000468C00000004L); + } + + @Override + public Object getDescribed() { + return this.selector; + } + + @Override + public String toString() { + return "AmqpJmsSelectorType{" + selector + "}"; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java new file mode 100644 index 0000000..52e5eaf --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import org.apache.activemq.transport.amqp.client.util.UnmodifiableDelivery; +import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.message.Message; + +public class AmqpMessage { + + private final AmqpReceiver receiver; + private final Message message; + private final Delivery delivery; + + /** + * Creates a new AmqpMessage that wraps the information necessary to handle + * an outgoing message. + */ + public AmqpMessage() { + receiver = null; + delivery = null; + + message = Proton.message(); + message.setDurable(true); + } + + /** + * Creates a new AmqpMessage that wraps the information necessary to handle + * an outgoing message. + * + * @param message + * the Proton message that is to be sent. + */ + public AmqpMessage(Message message) { + this(null, message, null); + } + + /** + * Creates a new AmqpMessage that wraps the information necessary to handle + * an incoming delivery. + * + * @param receiver + * the AmqpReceiver that received this message. + * @param message + * the Proton message that was received. + * @param delivery + * the Delivery instance that produced this message. + */ + public AmqpMessage(AmqpReceiver receiver, Message message, Delivery delivery) { + this.receiver = receiver; + this.message = message; + this.delivery = delivery; + } + + /** + * Accepts the message marking it as consumed on the remote peer. + * + * @throws Exception if an error occurs during the accept. + */ + public void accept() throws Exception { + if (receiver == null) { + throw new IllegalStateException("Can't accept non-received message."); + } + + receiver.accept(delivery); + } + + /** + * Rejects the message, marking it as not deliverable here and failed to deliver. + * + * @throws Exception if an error occurs during the reject. + */ + public void reject() throws Exception { + reject(true, true); + } + + /** + * Rejects the message, marking it as failed to deliver and applying the given value + * to the undeliverable here tag. + * + * @param undeliverableHere + * marks the delivery as not being able to be process by link it was sent to. + * + * @throws Exception if an error occurs during the reject. + */ + public void reject(boolean undeliverableHere) throws Exception { + reject(undeliverableHere, true); + } + + /** + * Rejects the message, marking it as not deliverable here and failed to deliver. + * + * @param undeliverableHere + * marks the delivery as not being able to be process by link it was sent to. + * @param deliveryFailed + * indicates that the delivery failed for some reason. + * + * @throws Exception if an error occurs during the reject. + */ + public void reject(boolean undeliverableHere, boolean deliveryFailed) throws Exception { + if (receiver == null) { + throw new IllegalStateException("Can't reject non-received message."); + } + + receiver.reject(delivery, undeliverableHere, deliveryFailed); + } + + /** + * Release the message, remote can redeliver it elsewhere. + * + * @throws Exception if an error occurs during the reject. + */ + public void release() throws Exception { + if (receiver == null) { + throw new IllegalStateException("Can't release non-received message."); + } + + receiver.release(delivery); + } + + /** + * @return the AMQP Delivery object linked to a received message. + */ + public Delivery getWrappedDelivery() { + if (delivery != null) { + return new UnmodifiableDelivery(delivery); + } + + return null; + } + + /** + * @return the AMQP Message that is wrapped by this object. + */ + public Message getWrappedMessage() { + return message; + } + + /** + * @return the AmqpReceiver that consumed this message. + */ + public AmqpReceiver getAmqpReceiver() { + return receiver; + } + + /** + * Sets a String value into the body of an outgoing Message, throws + * an exception if this is an incoming message instance. + * + * @param value + * the String value to store in the Message body. + * + * @throws IllegalStateException if the message is read only. + */ + public void setText(String value) throws IllegalStateException { + if (delivery != null) { + throw new IllegalStateException("Message is read only."); + } + + AmqpValue body = new AmqpValue(value); + getWrappedMessage().setBody(body); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalType.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalType.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalType.java new file mode 100644 index 0000000..2d61b83 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalType.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import org.apache.qpid.proton.amqp.DescribedType; +import org.apache.qpid.proton.amqp.UnsignedLong; + +/** + * A Described Type wrapper for JMS no local option for MessageConsumer. + */ +public class AmqpNoLocalType implements DescribedType { + + public static final AmqpNoLocalType NO_LOCAL = new AmqpNoLocalType(); + + private final String noLocal; + + public AmqpNoLocalType() { + this.noLocal = "NoLocalFilter{}"; + } + + @Override + public Object getDescriptor() { + return UnsignedLong.valueOf(0x0000468C00000003L); + } + + @Override + public Object getDescribed() { + return this.noLocal; + } +}
