ARTEMIS-637 Port 5.x AMQP test client

Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/df41a60e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/df41a60e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/df41a60e

Branch: refs/heads/master
Commit: df41a60e21783f33f435ef3a9efa54f9dab146d7
Parents: 5695164
Author: Martyn Taylor <[email protected]>
Authored: Fri Jul 15 18:03:31 2016 +0100
Committer: Andy Taylor <[email protected]>
Committed: Wed Jul 20 10:33:44 2016 +0100

----------------------------------------------------------------------
 tests/artemis-test-support/pom.xml              |  57 ++
 .../transport/amqp/AmqpProtocolException.java   |  62 ++
 .../activemq/transport/amqp/AmqpSupport.java    | 206 ++++
 .../amqp/client/AmqpAbstractResource.java       | 321 +++++++
 .../transport/amqp/client/AmqpClient.java       | 245 +++++
 .../transport/amqp/client/AmqpConnection.java   | 720 ++++++++++++++
 .../amqp/client/AmqpConnectionListener.java     |  31 +
 .../client/AmqpDefaultConnectionListener.java   |  28 +
 .../transport/amqp/client/AmqpEventSink.java    |  69 ++
 .../amqp/client/AmqpJmsSelectorFilter.java      |  48 +
 .../transport/amqp/client/AmqpMessage.java      | 515 ++++++++++
 .../amqp/client/AmqpNoLocalFilter.java          |  45 +
 .../transport/amqp/client/AmqpReceiver.java     | 946 +++++++++++++++++++
 .../amqp/client/AmqpRedirectedException.java    |  61 ++
 .../transport/amqp/client/AmqpResource.java     | 108 +++
 .../transport/amqp/client/AmqpSender.java       | 452 +++++++++
 .../transport/amqp/client/AmqpSession.java      | 454 +++++++++
 .../transport/amqp/client/AmqpSupport.java      | 195 ++++
 .../amqp/client/AmqpTransactionContext.java     | 261 +++++
 .../amqp/client/AmqpTransactionCoordinator.java | 262 +++++
 .../amqp/client/AmqpTransactionId.java          |  98 ++
 .../amqp/client/AmqpTransferTagGenerator.java   | 104 ++
 .../amqp/client/AmqpUnknownFilterType.java      |  49 +
 .../transport/amqp/client/AmqpValidator.java    | 101 ++
 .../amqp/client/sasl/AbstractMechanism.java     |  97 ++
 .../amqp/client/sasl/AnonymousMechanism.java    |  43 +
 .../amqp/client/sasl/CramMD5Mechanism.java      |  94 ++
 .../transport/amqp/client/sasl/Mechanism.java   | 143 +++
 .../amqp/client/sasl/PlainMechanism.java        |  76 ++
 .../amqp/client/sasl/SaslAuthenticator.java     | 182 ++++
 .../client/transport/NettyTcpTransport.java     | 402 ++++++++
 .../amqp/client/transport/NettyTransport.java   |  52 +
 .../client/transport/NettyTransportFactory.java |  80 ++
 .../transport/NettyTransportListener.java       |  46 +
 .../client/transport/NettyTransportOptions.java | 177 ++++
 .../transport/NettyTransportSslOptions.java     | 284 ++++++
 .../client/transport/NettyTransportSupport.java | 288 ++++++
 .../amqp/client/transport/NettyWSTransport.java | 472 +++++++++
 .../PartialPooledByteBufAllocator.java          | 134 +++
 .../client/transport/X509AliasKeyManager.java   |  86 ++
 .../transport/amqp/client/util/AsyncResult.java |  46 +
 .../amqp/client/util/ClientFuture.java          | 110 +++
 .../util/ClientFutureSynchronization.java       |  30 +
 .../amqp/client/util/IOExceptionSupport.java    |  45 +
 .../transport/amqp/client/util/IdGenerator.java | 274 ++++++
 .../amqp/client/util/NoOpAsyncResult.java       |  40 +
 .../amqp/client/util/PropertyUtil.java          | 533 +++++++++++
 .../amqp/client/util/StringArrayConverter.java  |  64 ++
 .../amqp/client/util/TypeConversionSupport.java | 218 +++++
 .../client/util/UnmodifiableConnection.java     | 202 ++++
 .../amqp/client/util/UnmodifiableDelivery.java  | 170 ++++
 .../amqp/client/util/UnmodifiableLink.java      | 276 ++++++
 .../amqp/client/util/UnmodifiableReceiver.java  |  59 ++
 .../amqp/client/util/UnmodifiableSender.java    |  45 +
 .../amqp/client/util/UnmodifiableSession.java   | 150 +++
 .../amqp/client/util/UnmodifiableTransport.java | 274 ++++++
 .../amqp/client/util/WrappedAsyncResult.java    |  59 ++
 tests/integration-tests/pom.xml                 |   5 +
 tests/pom.xml                                   |   8 +
 59 files changed, 10702 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/pom.xml
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/pom.xml 
b/tests/artemis-test-support/pom.xml
new file mode 100644
index 0000000..ec0c49d
--- /dev/null
+++ b/tests/artemis-test-support/pom.xml
@@ -0,0 +1,57 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+   <modelVersion>4.0.0</modelVersion>
+   <parent>
+      <groupId>org.apache.activemq.tests</groupId>
+      <artifactId>artemis-tests-pom</artifactId>
+      <version>1.4.0-SNAPSHOT</version>
+   </parent>
+
+   <artifactId>artemis-test-support</artifactId>
+   <packaging>jar</packaging>
+   <name>ActiveMQ Artemis Test Support</name>
+
+   <properties>
+      <activemq.basedir>${project.basedir}/../..</activemq.basedir>
+   </properties>
+
+   <dependencies>
+      <dependency>
+         <groupId>org.apache.qpid</groupId>
+         <artifactId>proton-j</artifactId>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.qpid</groupId>
+         <artifactId>qpid-jms-client</artifactId>
+      </dependency>
+      <dependency>
+         <groupId>org.slf4j</groupId>
+         <artifactId>slf4j-api</artifactId>
+      </dependency>
+      <dependency>
+         <groupId>io.netty</groupId>
+         <artifactId>netty-all</artifactId>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.activemq</groupId>
+         <artifactId>activemq-client</artifactId>
+      </dependency>
+   </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java
new file mode 100644
index 0000000..6e58417
--- /dev/null
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import java.io.IOException;
+
+public class AmqpProtocolException extends IOException {
+
+   private static final long serialVersionUID = -2869735532997332242L;
+
+   private final String symbolicName;
+   private final boolean fatal;
+
+   public AmqpProtocolException() {
+      this(null);
+   }
+
+   public AmqpProtocolException(String s) {
+      this(s, false);
+   }
+
+   public AmqpProtocolException(String s, boolean fatal) {
+      this(s, fatal, null);
+   }
+
+   public AmqpProtocolException(String s, String msg) {
+      this(s, msg, false, null);
+   }
+
+   public AmqpProtocolException(String s, boolean fatal, Throwable cause) {
+      this("error", s, fatal, cause);
+   }
+
+   public AmqpProtocolException(String symbolicName, String s, boolean fatal, 
Throwable cause) {
+      super(s);
+      this.symbolicName = symbolicName;
+      this.fatal = fatal;
+      initCause(cause);
+   }
+
+   public boolean isFatal() {
+      return fatal;
+   }
+
+   public String getSymbolicName() {
+      return symbolicName;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
new file mode 100644
index 0000000..cde4def
--- /dev/null
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import java.nio.ByteBuffer;
+import java.util.AbstractMap;
+import java.util.Map;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+import org.apache.qpid.proton.amqp.transaction.Coordinator;
+import org.fusesource.hawtbuf.Buffer;
+
+/**
+ * Set of useful methods and definitions used in the AMQP protocol handling
+ */
+public class AmqpSupport {
+
+   // Identification values used to locating JMS selector types.
+   public static final UnsignedLong JMS_SELECTOR_CODE = 
UnsignedLong.valueOf(0x0000468C00000004L);
+   public static final Symbol JMS_SELECTOR_NAME = 
Symbol.valueOf("apache.org:selector-filter:string");
+   public static final Object[] JMS_SELECTOR_FILTER_IDS = new 
Object[]{JMS_SELECTOR_CODE, JMS_SELECTOR_NAME};
+   public static final UnsignedLong NO_LOCAL_CODE = 
UnsignedLong.valueOf(0x0000468C00000003L);
+   public static final Symbol NO_LOCAL_NAME = 
Symbol.valueOf("apache.org:no-local-filter:list");
+   public static final Object[] NO_LOCAL_FILTER_IDS = new 
Object[]{NO_LOCAL_CODE, NO_LOCAL_NAME};
+
+   // Capabilities used to identify destination type in some requests.
+   public static final Symbol TEMP_QUEUE_CAPABILITY = 
Symbol.valueOf("temporary-queue");
+   public static final Symbol TEMP_TOPIC_CAPABILITY = 
Symbol.valueOf("temporary-topic");
+
+   // Symbols used to announce connection information to remote peer.
+   public static final Symbol INVALID_FIELD = Symbol.valueOf("invalid-field");
+   public static final Symbol CONTAINER_ID = Symbol.valueOf("container-id");
+
+   // Symbols used to announce connection information to remote peer.
+   public static final Symbol ANONYMOUS_RELAY = 
Symbol.valueOf("ANONYMOUS-RELAY");
+   public static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix");
+   public static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix");
+   public static final Symbol CONNECTION_OPEN_FAILED = 
Symbol.valueOf("amqp:connection-establishment-failed");
+   public static final Symbol PRODUCT = Symbol.valueOf("product");
+   public static final Symbol VERSION = Symbol.valueOf("version");
+   public static final Symbol PLATFORM = Symbol.valueOf("platform");
+
+   // Symbols used in configuration of newly opened links.
+   public static final Symbol COPY = Symbol.getSymbol("copy");
+
+   // Lifetime policy symbols
+   public static final Symbol LIFETIME_POLICY = 
Symbol.valueOf("lifetime-policy");
+
+   /**
+    * Search for a given Symbol in a given array of Symbol object.
+    *
+    * @param symbols the set of Symbols to search.
+    * @param key     the value to try and find in the Symbol array.
+    * @return true if the key is found in the given Symbol array.
+    */
+   public static boolean contains(Symbol[] symbols, Symbol key) {
+      if (symbols == null || symbols.length == 0) {
+         return false;
+      }
+
+      for (Symbol symbol : symbols) {
+         if (symbol.equals(key)) {
+            return true;
+         }
+      }
+
+      return false;
+   }
+
+   /**
+    * Search for a particular filter using a set of known indentification 
values
+    * in the Map of filters.
+    *
+    * @param filters   The filters map that should be searched.
+    * @param filterIds The aliases for the target filter to be located.
+    * @return the filter if found in the mapping or null if not found.
+    */
+   public static Map.Entry<Symbol, DescribedType> findFilter(Map<Symbol, 
Object> filters, Object[] filterIds) {
+
+      if (filterIds == null || filterIds.length == 0) {
+         throw new IllegalArgumentException("Invalid empty Filter Ids array 
passed: ");
+      }
+
+      if (filters == null || filters.isEmpty()) {
+         return null;
+      }
+
+      for (Map.Entry<Symbol, Object> filter : filters.entrySet()) {
+         if (filter.getValue() instanceof DescribedType) {
+            DescribedType describedType = ((DescribedType) filter.getValue());
+            Object descriptor = describedType.getDescriptor();
+
+            for (Object filterId : filterIds) {
+               if (descriptor.equals(filterId)) {
+                  return new 
AbstractMap.SimpleImmutableEntry<>(filter.getKey(), describedType);
+               }
+            }
+         }
+      }
+
+      return null;
+   }
+
+   /**
+    * Conversion from Java ByteBuffer to a HawtBuf buffer.
+    *
+    * @param data the ByteBuffer instance to convert.
+    * @return a new HawtBuf buffer converted from the given ByteBuffer.
+    */
+   public static Buffer toBuffer(ByteBuffer data) {
+      if (data == null) {
+         return null;
+      }
+
+      Buffer rc;
+
+      if (data.isDirect()) {
+         rc = new Buffer(data.remaining());
+         data.get(rc.data);
+      }
+      else {
+         rc = new Buffer(data);
+         data.position(data.position() + data.remaining());
+      }
+
+      return rc;
+   }
+
+   /**
+    * Given a long value, convert it to a byte array for marshalling.
+    *
+    * @param value the value to convert.
+    * @return a new byte array that holds the big endian value of the long.
+    */
+   public static byte[] toBytes(long value) {
+      Buffer buffer = new Buffer(8);
+      buffer.bigEndianEditor().writeLong(value);
+      return buffer.data;
+   }
+
+   /**
+    * Converts a Binary value to a long assuming that the contained value is
+    * stored in Big Endian encoding.
+    *
+    * @param value the Binary object whose payload is converted to a long.
+    * @return a long value constructed from the bytes of the Binary instance.
+    */
+   public static long toLong(Binary value) {
+      Buffer buffer = new Buffer(value.getArray(), value.getArrayOffset(), 
value.getLength());
+      return buffer.bigEndianEditor().readLong();
+   }
+
+   /**
+    * Given an AMQP endpoint, deduce the appropriate ActiveMQDestination type 
and create
+    * a new instance.  By default if the endpoint address does not carry the 
standard prefix
+    * value then we default to a Queue type destination.  If the endpoint is 
null or is an
+    * AMQP Coordinator type endpoint this method returns null to indicate no 
destination
+    * can be mapped.
+    *
+    * @param endpoint the AMQP endpoint to construct an ActiveMQDestination 
from.
+    * @return a new ActiveMQDestination that best matches the address of the 
given endpoint
+    * @throws AmqpProtocolException if an error occurs while deducing the 
destination type.
+    */
+   public static ActiveMQDestination createDestination(Object endpoint) throws 
AmqpProtocolException {
+      if (endpoint == null) {
+         return null;
+      }
+      else if (endpoint instanceof Coordinator) {
+         return null;
+      }
+      else if (endpoint instanceof 
org.apache.qpid.proton.amqp.messaging.Terminus) {
+         org.apache.qpid.proton.amqp.messaging.Terminus terminus = 
(org.apache.qpid.proton.amqp.messaging.Terminus) endpoint;
+         if (terminus.getAddress() == null || terminus.getAddress().length() 
== 0) {
+            if (terminus instanceof 
org.apache.qpid.proton.amqp.messaging.Source) {
+               throw new AmqpProtocolException("amqp:invalid-field", "source 
address not set");
+            }
+            else {
+               throw new AmqpProtocolException("amqp:invalid-field", "target 
address not set");
+            }
+         }
+
+         return ActiveMQDestination.createDestination(terminus.getAddress(), 
ActiveMQDestination.QUEUE_TYPE);
+      }
+      else {
+         throw new RuntimeException("Unexpected terminus type: " + endpoint);
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
new file mode 100644
index 0000000..b99c56b
--- /dev/null
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import java.io.IOException;
+
+import org.apache.activemq.transport.amqp.client.util.AsyncResult;
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract base for all AmqpResource implementations to extend.
+ *
+ * This abstract class wraps up the basic state management bits so that the 
concrete
+ * object don't have to reproduce it.  Provides hooks for the subclasses to 
initialize
+ * and shutdown.
+ */
+public abstract class AmqpAbstractResource<E extends Endpoint> implements 
AmqpResource {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AmqpAbstractResource.class);
+
+   protected AsyncResult openRequest;
+   protected AsyncResult closeRequest;
+
+   private AmqpValidator amqpStateInspector;
+
+   private E endpoint;
+
+   @Override
+   public void open(AsyncResult request) {
+      this.openRequest = request;
+      doOpen();
+      getEndpoint().setContext(this);
+   }
+
+   @Override
+   public boolean isOpen() {
+      return getEndpoint().getRemoteState() == EndpointState.ACTIVE;
+   }
+
+   @Override
+   public void opened() {
+      if (this.openRequest != null) {
+         this.openRequest.onSuccess();
+         this.openRequest = null;
+      }
+   }
+
+   @Override
+   public void detach(AsyncResult request) {
+      // If already closed signal success or else the caller might never get 
notified.
+      if (getEndpoint().getLocalState() == EndpointState.CLOSED || 
getEndpoint().getRemoteState() == EndpointState.CLOSED) {
+
+         if (getEndpoint().getLocalState() != EndpointState.CLOSED) {
+            doDetach();
+            getEndpoint().free();
+         }
+
+         request.onSuccess();
+      }
+      else {
+         this.closeRequest = request;
+         doDetach();
+      }
+   }
+
+   @Override
+   public void close(AsyncResult request) {
+      // If already closed signal success or else the caller might never get 
notified.
+      if (getEndpoint().getLocalState() == EndpointState.CLOSED || 
getEndpoint().getRemoteState() == EndpointState.CLOSED) {
+
+         if (getEndpoint().getLocalState() != EndpointState.CLOSED) {
+            doClose();
+            getEndpoint().free();
+         }
+
+         request.onSuccess();
+      }
+      else {
+         this.closeRequest = request;
+         doClose();
+      }
+   }
+
+   @Override
+   public boolean isClosed() {
+      return getEndpoint().getLocalState() == EndpointState.CLOSED;
+   }
+
+   @Override
+   public void closed() {
+      getEndpoint().close();
+      getEndpoint().free();
+
+      if (this.closeRequest != null) {
+         this.closeRequest.onSuccess();
+         this.closeRequest = null;
+      }
+   }
+
+   @Override
+   public void failed() {
+      failed(new Exception("Remote request failed."));
+   }
+
+   @Override
+   public void failed(Exception cause) {
+      if (openRequest != null) {
+         if (endpoint != null) {
+            // TODO: if this is a producer/consumer link then we may only be 
detached,
+            // rather than fully closed, and should respond appropriately.
+            endpoint.close();
+         }
+         openRequest.onFailure(cause);
+         openRequest = null;
+      }
+
+      if (closeRequest != null) {
+         closeRequest.onFailure(cause);
+         closeRequest = null;
+      }
+   }
+
+   @Override
+   public void remotelyClosed(AmqpConnection connection) {
+      Exception error = 
AmqpSupport.convertToException(getEndpoint().getRemoteCondition());
+
+      if (endpoint != null) {
+         // TODO: if this is a producer/consumer link then we may only be 
detached,
+         // rather than fully closed, and should respond appropriately.
+         endpoint.close();
+      }
+
+      LOG.info("Resource {} was remotely closed", this);
+
+      connection.fireClientException(error);
+   }
+
+   @Override
+   public void locallyClosed(AmqpConnection connection, Exception error) {
+      if (endpoint != null) {
+         // TODO: if this is a producer/consumer link then we may only be 
detached,
+         // rather than fully closed, and should respond appropriately.
+         endpoint.close();
+      }
+
+      LOG.info("Resource {} was locally closed", this);
+
+      connection.fireClientException(error);
+   }
+
+   public E getEndpoint() {
+      return this.endpoint;
+   }
+
+   public void setEndpoint(E endpoint) {
+      this.endpoint = endpoint;
+   }
+
+   public AmqpValidator getStateInspector() {
+      return amqpStateInspector;
+   }
+
+   public void setStateInspector(AmqpValidator stateInspector) {
+      if (stateInspector == null) {
+         stateInspector = new AmqpValidator();
+      }
+
+      this.amqpStateInspector = stateInspector;
+   }
+
+   public EndpointState getLocalState() {
+      if (getEndpoint() == null) {
+         return EndpointState.UNINITIALIZED;
+      }
+      return getEndpoint().getLocalState();
+   }
+
+   public EndpointState getRemoteState() {
+      if (getEndpoint() == null) {
+         return EndpointState.UNINITIALIZED;
+      }
+      return getEndpoint().getRemoteState();
+   }
+
+   public boolean hasRemoteError() {
+      return getEndpoint().getRemoteCondition().getCondition() != null;
+   }
+
+   @Override
+   public void processRemoteOpen(AmqpConnection connection) throws IOException 
{
+      doOpenInspection();
+      doOpenCompletion();
+   }
+
+   @Override
+   public void processRemoteDetach(AmqpConnection connection) throws 
IOException {
+      doDetachedInspection();
+      if (isAwaitingClose()) {
+         LOG.debug("{} is now closed: ", this);
+         closed();
+      }
+      else {
+         remotelyClosed(connection);
+      }
+   }
+
+   @Override
+   public void processRemoteClose(AmqpConnection connection) throws 
IOException {
+      doClosedInspection();
+      if (isAwaitingClose()) {
+         LOG.debug("{} is now closed: ", this);
+         closed();
+      }
+      else if (isAwaitingOpen()) {
+         // Error on Open, create exception and signal failure.
+         LOG.warn("Open of {} failed: ", this);
+         Exception openError;
+         if (hasRemoteError()) {
+            openError = 
AmqpSupport.convertToException(getEndpoint().getRemoteCondition());
+         }
+         else {
+            openError = getOpenAbortException();
+         }
+
+         failed(openError);
+      }
+      else {
+         remotelyClosed(connection);
+      }
+   }
+
+   @Override
+   public void processDeliveryUpdates(AmqpConnection connection) throws 
IOException {
+   }
+
+   @Override
+   public void processFlowUpdates(AmqpConnection connection) throws 
IOException {
+   }
+
+   /**
+    * Perform the open operation on the managed endpoint.  A subclass may
+    * override this method to provide additional open actions or configuration
+    * updates.
+    */
+   protected void doOpen() {
+      getEndpoint().open();
+   }
+
+   /**
+    * Perform the close operation on the managed endpoint.  A subclass may
+    * override this method to provide additional close actions or alter the
+    * standard close path such as endpoint detach etc.
+    */
+   protected void doClose() {
+      getEndpoint().close();
+   }
+
+   /**
+    * Perform the detach operation on the managed endpoint.
+    *
+    * By default this method throws an UnsupportedOperationException, a 
subclass
+    * must implement this and do a detach if its resource supports that.
+    */
+   protected void doDetach() {
+      throw new UnsupportedOperationException("Endpoint cannot be detached.");
+   }
+
+   /**
+    * Complete the open operation on the managed endpoint. A subclass may
+    * override this method to provide additional verification actions or 
configuration
+    * updates.
+    */
+   protected void doOpenCompletion() {
+      LOG.debug("{} is now open: ", this);
+      opened();
+   }
+
+   /**
+    * When aborting the open operation, and there isnt an error condition,
+    * provided by the peer, the returned exception will be used instead.
+    * A subclass may override this method to provide alternative behaviour.
+    */
+   protected Exception getOpenAbortException() {
+      return new IOException("Open failed unexpectedly.");
+   }
+
+   // TODO - Fina a more generic way to do this.
+   protected abstract void doOpenInspection();
+
+   protected abstract void doClosedInspection();
+
+   protected void doDetachedInspection() {
+   }
+
+   //----- Private implementation utility methods ---------------------------//
+
+   private boolean isAwaitingOpen() {
+      return this.openRequest != null;
+   }
+
+   private boolean isAwaitingClose() {
+      return this.closeRequest != null;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
new file mode 100644
index 0000000..001942e
--- /dev/null
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.activemq.transport.amqp.client.transport.NettyTransport;
+import 
org.apache.activemq.transport.amqp.client.transport.NettyTransportFactory;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Connection instance used to connect to the Broker using Proton as
+ * the AMQP protocol handler.
+ */
+public class AmqpClient {
+
+   private static final Logger LOG = LoggerFactory.getLogger(AmqpClient.class);
+
+   private final String username;
+   private final String password;
+   private final URI remoteURI;
+   private String authzid;
+   private String mechanismRestriction;
+
+   private AmqpValidator stateInspector = new AmqpValidator();
+   private List<Symbol> offeredCapabilities = Collections.emptyList();
+   private Map<Symbol, Object> offeredProperties = Collections.emptyMap();
+
+   /**
+    * Creates an AmqpClient instance which can be used as a factory for 
connections.
+    *
+    * @param remoteURI The address of the remote peer to connect to.
+    * @param username  The user name to use when authenticating the client.
+    * @param password  The password to use when authenticating the client.
+    */
+   public AmqpClient(URI remoteURI, String username, String password) {
+      this.remoteURI = remoteURI;
+      this.password = password;
+      this.username = username;
+   }
+
+   /**
+    * Creates a connection with the broker at the given location, this method 
initiates a
+    * connect attempt immediately and will fail if the remote peer cannot be 
reached.
+    *
+    * @throws Exception if an error occurs attempting to connect to the Broker.
+    * @returns a new connection object used to interact with the connected 
peer.
+    */
+   public AmqpConnection connect() throws Exception {
+
+      AmqpConnection connection = createConnection();
+
+      LOG.debug("Attempting to create new connection to peer: {}", remoteURI);
+      connection.connect();
+
+      return connection;
+   }
+
+   /**
+    * Creates a connection object using the configured values for user, 
password, remote URI
+    * etc.  This method does not immediately initiate a connection to the 
remote leaving that
+    * to the caller which provides a connection object that can have 
additional configuration
+    * changes applied before the <code>connect</code> method is invoked.
+    *
+    * @throws Exception if an error occurs attempting to connect to the Broker.
+    * @returns a new connection object used to interact with the connected 
peer.
+    */
+   public AmqpConnection createConnection() throws Exception {
+      if (username == null && password != null) {
+         throw new IllegalArgumentException("Password must be null if user 
name value is null");
+      }
+
+      NettyTransport transport = 
NettyTransportFactory.createTransport(remoteURI);
+      AmqpConnection connection = new AmqpConnection(transport, username, 
password);
+
+      connection.setMechanismRestriction(mechanismRestriction);
+      connection.setAuthzid(authzid);
+
+      connection.setOfferedCapabilities(getOfferedCapabilities());
+      connection.setOfferedProperties(getOfferedProperties());
+      connection.setStateInspector(getStateInspector());
+
+      return connection;
+   }
+
+   /**
+    * @return the user name value given when constructed.
+    */
+   public String getUsername() {
+      return username;
+   }
+
+   /**
+    * @return the password value given when constructed.
+    */
+   public String getPassword() {
+      return password;
+   }
+
+   /**
+    * @param authzid The authzid used when authenticating (currently only with 
PLAIN)
+    */
+   public void setAuthzid(String authzid) {
+      this.authzid = authzid;
+   }
+
+   public String getAuthzid() {
+      return authzid;
+   }
+
+   /**
+    * @param mechanismRestriction The mechanism to use when authenticating (if 
offered by the server)
+    */
+   public void setMechanismRestriction(String mechanismRestriction) {
+      this.mechanismRestriction = mechanismRestriction;
+   }
+
+   public String getMechanismRestriction() {
+      return mechanismRestriction;
+   }
+
+   /**
+    * @return the currently set address to use to connect to the AMQP peer.
+    */
+   public URI getRemoteURI() {
+      return remoteURI;
+   }
+
+   /**
+    * Sets the offered capabilities that should be used when a new connection 
attempt
+    * is made.
+    *
+    * @param offeredCapabilities the list of capabilities to offer when 
connecting.
+    */
+   public void setOfferedCapabilities(List<Symbol> offeredCapabilities) {
+      if (offeredCapabilities != null) {
+         offeredCapabilities = Collections.emptyList();
+      }
+
+      this.offeredCapabilities = offeredCapabilities;
+   }
+
+   /**
+    * @return an unmodifiable view of the currently set offered capabilities
+    */
+   public List<Symbol> getOfferedCapabilities() {
+      return Collections.unmodifiableList(offeredCapabilities);
+   }
+
+   /**
+    * Sets the offered connection properties that should be used when a new 
connection
+    * attempt is made.
+    *
+    * @param offeredProperties the map of properties to offer when connecting.
+    */
+   public void setOfferedProperties(Map<Symbol, Object> offeredProperties) {
+      if (offeredProperties != null) {
+         offeredProperties = Collections.emptyMap();
+      }
+
+      this.offeredProperties = offeredProperties;
+   }
+
+   /**
+    * @return an unmodifiable view of the currently set connection properties.
+    */
+   public Map<Symbol, Object> getOfferedProperties() {
+      return Collections.unmodifiableMap(offeredProperties);
+   }
+
+   /**
+    * @return the currently set state inspector used to check state after 
various events.
+    */
+   public AmqpValidator getStateInspector() {
+      return stateInspector;
+   }
+
+   /**
+    * Sets the state inspector used to check that the AMQP resource is valid 
after
+    * specific lifecycle events such as open and close.
+    *
+    * @param stateInspector the new state inspector to use.
+    */
+   public void setValidator(AmqpValidator stateInspector) {
+      if (stateInspector == null) {
+         stateInspector = new AmqpValidator();
+      }
+
+      this.stateInspector = stateInspector;
+   }
+
+   @Override
+   public String toString() {
+      return "AmqpClient: " + getRemoteURI().getHost() + ":" + 
getRemoteURI().getPort();
+   }
+
+   /**
+    * Creates an anonymous connection with the broker at the given location.
+    *
+    * @param broker the address of the remote broker instance.
+    * @throws Exception if an error occurs attempting to connect to the Broker.
+    * @returns a new connection object used to interact with the connected 
peer.
+    */
+   public static AmqpConnection connect(URI broker) throws Exception {
+      return connect(broker, null, null);
+   }
+
+   /**
+    * Creates a connection with the broker at the given location.
+    *
+    * @param broker   the address of the remote broker instance.
+    * @param username the user name to use to connect to the broker or null 
for anonymous.
+    * @param password the password to use to connect to the broker, must be 
null if user name is null.
+    * @throws Exception if an error occurs attempting to connect to the Broker.
+    * @returns a new connection object used to interact with the connected 
peer.
+    */
+   public static AmqpConnection connect(URI broker, String username, String 
password) throws Exception {
+      if (username == null && password != null) {
+         throw new IllegalArgumentException("Password must be null if user 
name value is null");
+      }
+
+      AmqpClient client = new AmqpClient(broker, username, password);
+
+      return client.connect();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
new file mode 100644
index 0000000..1454dd9
--- /dev/null
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
@@ -0,0 +1,720 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.activemq.transport.InactivityIOException;
+import org.apache.activemq.transport.amqp.client.sasl.SaslAuthenticator;
+import 
org.apache.activemq.transport.amqp.client.transport.NettyTransportListener;
+import org.apache.activemq.transport.amqp.client.util.AsyncResult;
+import org.apache.activemq.transport.amqp.client.util.ClientFuture;
+import org.apache.activemq.transport.amqp.client.util.IdGenerator;
+import org.apache.activemq.transport.amqp.client.util.NoOpAsyncResult;
+import org.apache.activemq.transport.amqp.client.util.UnmodifiableConnection;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Event.Type;
+import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Transport;
+import org.apache.qpid.proton.engine.impl.CollectorImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.activemq.transport.amqp.AmqpSupport.CONNECTION_OPEN_FAILED;
+
+public class AmqpConnection extends AmqpAbstractResource<Connection> 
implements NettyTransportListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AmqpConnection.class);
+
+   private static final NoOpAsyncResult NOOP_REQUEST = new NoOpAsyncResult();
+
+   private static final int DEFAULT_MAX_FRAME_SIZE = 1024 * 1024 * 1;
+   // NOTE: Limit default channel max to signed short range to deal with
+   //       brokers that don't currently handle the unsigned range well.
+   private static final int DEFAULT_CHANNEL_MAX = 32767;
+   private static final IdGenerator CONNECTION_ID_GENERATOR = new 
IdGenerator();
+
+   public static final long DEFAULT_CONNECT_TIMEOUT = 515000;
+   public static final long DEFAULT_CLOSE_TIMEOUT = 30000;
+   public static final long DEFAULT_DRAIN_TIMEOUT = 60000;
+
+   private final ScheduledExecutorService serializer;
+   private final AtomicBoolean closed = new AtomicBoolean();
+   private final AtomicBoolean connected = new AtomicBoolean();
+   private final AtomicLong sessionIdGenerator = new AtomicLong();
+   private final AtomicLong txIdGenerator = new AtomicLong();
+   private final Collector protonCollector = new CollectorImpl();
+   private final 
org.apache.activemq.transport.amqp.client.transport.NettyTransport transport;
+   private final Transport protonTransport = Transport.Factory.create();
+
+   private final String username;
+   private final String password;
+   private final URI remoteURI;
+   private final String connectionId;
+   private List<Symbol> offeredCapabilities = Collections.emptyList();
+   private Map<Symbol, Object> offeredProperties = Collections.emptyMap();
+
+   private AmqpConnectionListener listener;
+   private SaslAuthenticator authenticator;
+   private String mechanismRestriction;
+   private String authzid;
+
+   private int idleTimeout = 0;
+   private boolean idleProcessingDisabled;
+   private String containerId;
+   private boolean authenticated;
+   private int channelMax = DEFAULT_CHANNEL_MAX;
+   private long connectTimeout = DEFAULT_CONNECT_TIMEOUT;
+   private long closeTimeout = DEFAULT_CLOSE_TIMEOUT;
+   private long drainTimeout = DEFAULT_DRAIN_TIMEOUT;
+
+   public 
AmqpConnection(org.apache.activemq.transport.amqp.client.transport.NettyTransport
 transport,
+                         String username,
+                         String password) {
+      setEndpoint(Connection.Factory.create());
+      getEndpoint().collect(protonCollector);
+
+      this.transport = transport;
+      this.username = username;
+      this.password = password;
+      this.connectionId = CONNECTION_ID_GENERATOR.generateId();
+      this.remoteURI = transport.getRemoteLocation();
+
+      this.serializer = Executors.newSingleThreadScheduledExecutor(new 
ThreadFactory() {
+
+         @Override
+         public Thread newThread(Runnable runner) {
+            Thread serial = new Thread(runner);
+            serial.setDaemon(true);
+            serial.setName(toString());
+            return serial;
+         }
+      });
+
+      this.transport.setTransportListener(this);
+   }
+
+   public void connect() throws Exception {
+      if (connected.compareAndSet(false, true)) {
+         transport.connect();
+
+         final ClientFuture future = new ClientFuture();
+         serializer.execute(new Runnable() {
+            @Override
+            public void run() {
+               getEndpoint().setContainer(safeGetContainerId());
+               getEndpoint().setHostname(remoteURI.getHost());
+               if (!getOfferedCapabilities().isEmpty()) {
+                  
getEndpoint().setOfferedCapabilities(getOfferedCapabilities().toArray(new 
Symbol[0]));
+               }
+               if (!getOfferedProperties().isEmpty()) {
+                  getEndpoint().setProperties(getOfferedProperties());
+               }
+
+               if (getIdleTimeout() > 0) {
+                  protonTransport.setIdleTimeout(getIdleTimeout());
+               }
+               protonTransport.setMaxFrameSize(getMaxFrameSize());
+               protonTransport.setChannelMax(getChannelMax());
+               protonTransport.bind(getEndpoint());
+               Sasl sasl = protonTransport.sasl();
+               if (sasl != null) {
+                  sasl.client();
+               }
+               authenticator = new SaslAuthenticator(sasl, username, password, 
authzid, mechanismRestriction);
+               open(future);
+
+               pumpToProtonTransport(future);
+            }
+         });
+
+         if (connectTimeout <= 0) {
+            future.sync();
+         }
+         else {
+            future.sync(connectTimeout, TimeUnit.MILLISECONDS);
+            if (getEndpoint().getRemoteState() != EndpointState.ACTIVE) {
+               throw new IOException("Failed to connect after configured 
timeout.");
+            }
+         }
+      }
+   }
+
+   public boolean isConnected() {
+      return transport.isConnected() && connected.get();
+   }
+
+   public void close() {
+      if (closed.compareAndSet(false, true)) {
+         final ClientFuture request = new ClientFuture();
+         serializer.execute(new Runnable() {
+
+            @Override
+            public void run() {
+               try {
+
+                  // If we are not connected then there is nothing we can do 
now
+                  // just signal success.
+                  if (!transport.isConnected()) {
+                     request.onSuccess();
+                  }
+
+                  if (getEndpoint() != null) {
+                     close(request);
+                  }
+                  else {
+                     request.onSuccess();
+                  }
+
+                  pumpToProtonTransport(request);
+               }
+               catch (Exception e) {
+                  LOG.debug("Caught exception while closing proton 
connection");
+               }
+            }
+         });
+
+         try {
+            if (closeTimeout <= 0) {
+               request.sync();
+            }
+            else {
+               request.sync(closeTimeout, TimeUnit.MILLISECONDS);
+            }
+         }
+         catch (IOException e) {
+            LOG.warn("Error caught while closing Provider: ", e.getMessage());
+         }
+         finally {
+            if (transport != null) {
+               try {
+                  transport.close();
+               }
+               catch (Exception e) {
+                  LOG.debug("Cuaght exception while closing down Transport: 
{}", e.getMessage());
+               }
+            }
+
+            serializer.shutdown();
+         }
+      }
+   }
+
+   /**
+    * Creates a new Session instance used to create AMQP resources like
+    * senders and receivers.
+    *
+    * @return a new AmqpSession that can be used to create links.
+    * @throws Exception if an error occurs during creation.
+    */
+   public AmqpSession createSession() throws Exception {
+      checkClosed();
+
+      final AmqpSession session = new AmqpSession(AmqpConnection.this, 
getNextSessionId());
+      final ClientFuture request = new ClientFuture();
+
+      serializer.execute(new Runnable() {
+
+         @Override
+         public void run() {
+            checkClosed();
+            session.setEndpoint(getEndpoint().session());
+            session.setStateInspector(getStateInspector());
+            session.open(request);
+            pumpToProtonTransport(request);
+         }
+      });
+
+      request.sync();
+
+      return session;
+   }
+
+   //----- Access to low level IO for specific test cases -------------------//
+
+   public void sendRawBytes(final byte[] rawData) throws Exception {
+      checkClosed();
+
+      final ClientFuture request = new ClientFuture();
+
+      serializer.execute(new Runnable() {
+
+         @Override
+         public void run() {
+            checkClosed();
+            try {
+               transport.send(Unpooled.wrappedBuffer(rawData));
+            }
+            catch (IOException e) {
+               fireClientException(e);
+            }
+            finally {
+               request.onSuccess();
+            }
+         }
+      });
+
+      request.sync();
+   }
+
+   //----- Configuration accessors ------------------------------------------//
+
+   /**
+    * @return the user name that was used to authenticate this connection.
+    */
+   public String getUsername() {
+      return username;
+   }
+
+   /**
+    * @return the password that was used to authenticate this connection.
+    */
+   public String getPassword() {
+      return password;
+   }
+
+   public void setAuthzid(String authzid) {
+      this.authzid = authzid;
+   }
+
+   public String getAuthzid() {
+      return authzid;
+   }
+
+   /**
+    * @return the URI of the remote peer this connection attached to.
+    */
+   public URI getRemoteURI() {
+      return remoteURI;
+   }
+
+   /**
+    * @return the container ID that will be set as the container Id.
+    */
+   public String getContainerId() {
+      return this.containerId;
+   }
+
+   /**
+    * Sets the container Id that will be configured on the connection prior to
+    * connecting to the remote peer.  Calling this after connect has no effect.
+    *
+    * @param containerId the container Id to use on the connection.
+    */
+   public void setContainerId(String containerId) {
+      this.containerId = containerId;
+   }
+
+   /**
+    * @return the currently set Max Frame Size value.
+    */
+   public int getMaxFrameSize() {
+      return DEFAULT_MAX_FRAME_SIZE;
+   }
+
+   public int getChannelMax() {
+      return channelMax;
+   }
+
+   public void setChannelMax(int channelMax) {
+      this.channelMax = channelMax;
+   }
+
+   public long getConnectTimeout() {
+      return connectTimeout;
+   }
+
+   public void setConnectTimeout(long connectTimeout) {
+      this.connectTimeout = connectTimeout;
+   }
+
+   public long getCloseTimeout() {
+      return closeTimeout;
+   }
+
+   public void setCloseTimeout(long closeTimeout) {
+      this.closeTimeout = closeTimeout;
+   }
+
+   public long getDrainTimeout() {
+      return drainTimeout;
+   }
+
+   public void setDrainTimeout(long drainTimeout) {
+      this.drainTimeout = drainTimeout;
+   }
+
+   public List<Symbol> getOfferedCapabilities() {
+      return offeredCapabilities;
+   }
+
+   public void setOfferedCapabilities(List<Symbol> offeredCapabilities) {
+      if (offeredCapabilities != null) {
+         offeredCapabilities = Collections.emptyList();
+      }
+
+      this.offeredCapabilities = offeredCapabilities;
+   }
+
+   public Map<Symbol, Object> getOfferedProperties() {
+      return offeredProperties;
+   }
+
+   public void setOfferedProperties(Map<Symbol, Object> offeredProperties) {
+      if (offeredProperties != null) {
+         offeredProperties = Collections.emptyMap();
+      }
+
+      this.offeredProperties = offeredProperties;
+   }
+
+   public Connection getConnection() {
+      return new UnmodifiableConnection(getEndpoint());
+   }
+
+   public AmqpConnectionListener getListener() {
+      return listener;
+   }
+
+   public void setListener(AmqpConnectionListener listener) {
+      this.listener = listener;
+   }
+
+   public int getIdleTimeout() {
+      return idleTimeout;
+   }
+
+   public void setIdleTimeout(int idleTimeout) {
+      this.idleTimeout = idleTimeout;
+   }
+
+   public void setIdleProcessingDisabled(boolean value) {
+      this.idleProcessingDisabled = value;
+   }
+
+   public boolean isIdleProcessingDisabled() {
+      return idleProcessingDisabled;
+   }
+
+   /**
+    * Sets a restriction on the SASL mechanism to use (if offered by the 
server).
+    *
+    * @param mechanismRestriction the mechanism to use
+    */
+   public void setMechanismRestriction(String mechanismRestriction) {
+      this.mechanismRestriction = mechanismRestriction;
+   }
+
+   public String getMechanismRestriction() {
+      return mechanismRestriction;
+   }
+
+   //----- Internal getters used from the child AmqpResource classes --------//
+
+   ScheduledExecutorService getScheduler() {
+      return this.serializer;
+   }
+
+   Connection getProtonConnection() {
+      return getEndpoint();
+   }
+
+   String getConnectionId() {
+      return this.connectionId;
+   }
+
+   AmqpTransactionId getNextTransactionId() {
+      return new AmqpTransactionId(connectionId + ":" + 
txIdGenerator.incrementAndGet());
+   }
+
+   void pumpToProtonTransport() {
+      pumpToProtonTransport(NOOP_REQUEST);
+   }
+
+   void pumpToProtonTransport(AsyncResult request) {
+      try {
+         boolean done = false;
+         while (!done) {
+            ByteBuffer toWrite = protonTransport.getOutputBuffer();
+            if (toWrite != null && toWrite.hasRemaining()) {
+               ByteBuf outbound = 
transport.allocateSendBuffer(toWrite.remaining());
+               outbound.writeBytes(toWrite);
+               transport.send(outbound);
+               protonTransport.outputConsumed();
+            }
+            else {
+               done = true;
+            }
+         }
+      }
+      catch (IOException e) {
+         fireClientException(e);
+         request.onFailure(e);
+      }
+   }
+
+   //----- Transport listener event hooks -----------------------------------//
+
+   @Override
+   public void onData(final ByteBuf incoming) {
+
+      // We need to retain until the serializer gets around to processing it.
+      ReferenceCountUtil.retain(incoming);
+
+      serializer.execute(new Runnable() {
+
+         @Override
+         public void run() {
+            ByteBuffer source = incoming.nioBuffer();
+            LOG.trace("Client Received from Broker {} bytes:", 
source.remaining());
+
+            if (protonTransport.isClosed()) {
+               LOG.debug("Ignoring incoming data because transport is closed");
+               return;
+            }
+
+            do {
+               ByteBuffer buffer = protonTransport.getInputBuffer();
+               int limit = Math.min(buffer.remaining(), source.remaining());
+               ByteBuffer duplicate = source.duplicate();
+               duplicate.limit(source.position() + limit);
+               buffer.put(duplicate);
+               protonTransport.processInput();
+               source.position(source.position() + limit);
+            } while (source.hasRemaining());
+
+            ReferenceCountUtil.release(incoming);
+
+            // Process the state changes from the latest data and then answer 
back
+            // any pending updates to the Broker.
+            processUpdates();
+            pumpToProtonTransport();
+         }
+      });
+   }
+
+   @Override
+   public void onTransportClosed() {
+      LOG.debug("The transport has unexpectedly closed");
+      failed(getOpenAbortException());
+   }
+
+   @Override
+   public void onTransportError(Throwable cause) {
+      fireClientException(cause);
+   }
+
+   //----- Internal implementation ------------------------------------------//
+
+   @Override
+   protected void doOpenCompletion() {
+      // If the remote indicates that a close is pending, don't open.
+      if (getEndpoint().getRemoteProperties() == null || 
!getEndpoint().getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
+
+         if (!isIdleProcessingDisabled()) {
+            // Using nano time since it is not related to the wall clock, 
which may change
+            long initialNow = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
+            long initialKeepAliveDeadline = protonTransport.tick(initialNow);
+            if (initialKeepAliveDeadline > 0) {
+
+               getScheduler().schedule(new Runnable() {
+
+                  @Override
+                  public void run() {
+                     try {
+                        if (getEndpoint().getLocalState() != 
EndpointState.CLOSED) {
+                           LOG.debug("Client performing next idle check");
+                           // Using nano time since it is not related to the 
wall clock, which may change
+                           long now = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
+                           long rescheduleAt = protonTransport.tick(now) - now;
+                           pumpToProtonTransport();
+                           if (protonTransport.isClosed()) {
+                              LOG.debug("Transport closed after inactivity 
check.");
+                              throw new InactivityIOException("Channel was 
inactive for to long");
+                           }
+
+                           if (rescheduleAt > 0) {
+                              getScheduler().schedule(this, rescheduleAt, 
TimeUnit.MILLISECONDS);
+                           }
+                        }
+                     }
+                     catch (Exception e) {
+                        try {
+                           transport.close();
+                        }
+                        catch (IOException e1) {
+                        }
+                        fireClientException(e);
+                     }
+                  }
+               }, initialKeepAliveDeadline - initialNow, 
TimeUnit.MILLISECONDS);
+            }
+         }
+         super.doOpenCompletion();
+      }
+   }
+
+   @Override
+   protected void doOpenInspection() {
+      try {
+         getStateInspector().inspectOpenedResource(getConnection());
+      }
+      catch (Throwable error) {
+         getStateInspector().markAsInvalid(error.getMessage());
+      }
+   }
+
+   @Override
+   protected void doClosedInspection() {
+      try {
+         getStateInspector().inspectClosedResource(getConnection());
+      }
+      catch (Throwable error) {
+         getStateInspector().markAsInvalid(error.getMessage());
+      }
+   }
+
+   protected void fireClientException(Throwable ex) {
+      AmqpConnectionListener listener = this.listener;
+      if (listener != null) {
+         listener.onException(ex);
+      }
+   }
+
+   protected void checkClosed() throws IllegalStateException {
+      if (closed.get()) {
+         throw new IllegalStateException("The Connection is already closed");
+      }
+   }
+
+   private void processUpdates() {
+      try {
+         Event protonEvent = null;
+         while ((protonEvent = protonCollector.peek()) != null) {
+            if (!protonEvent.getType().equals(Type.TRANSPORT)) {
+               LOG.trace("Client: New Proton Event: {}", 
protonEvent.getType());
+            }
+
+            AmqpEventSink amqpEventSink = null;
+            switch (protonEvent.getType()) {
+               case CONNECTION_REMOTE_CLOSE:
+                  amqpEventSink = (AmqpEventSink) 
protonEvent.getConnection().getContext();
+                  amqpEventSink.processRemoteClose(this);
+                  break;
+               case CONNECTION_REMOTE_OPEN:
+                  amqpEventSink = (AmqpEventSink) 
protonEvent.getConnection().getContext();
+                  amqpEventSink.processRemoteOpen(this);
+                  break;
+               case SESSION_REMOTE_CLOSE:
+                  amqpEventSink = (AmqpEventSink) 
protonEvent.getSession().getContext();
+                  amqpEventSink.processRemoteClose(this);
+                  break;
+               case SESSION_REMOTE_OPEN:
+                  amqpEventSink = (AmqpEventSink) 
protonEvent.getSession().getContext();
+                  amqpEventSink.processRemoteOpen(this);
+                  break;
+               case LINK_REMOTE_CLOSE:
+                  amqpEventSink = (AmqpEventSink) 
protonEvent.getLink().getContext();
+                  amqpEventSink.processRemoteClose(this);
+                  break;
+               case LINK_REMOTE_DETACH:
+                  amqpEventSink = (AmqpEventSink) 
protonEvent.getLink().getContext();
+                  amqpEventSink.processRemoteDetach(this);
+                  break;
+               case LINK_REMOTE_OPEN:
+                  amqpEventSink = (AmqpEventSink) 
protonEvent.getLink().getContext();
+                  amqpEventSink.processRemoteOpen(this);
+                  break;
+               case LINK_FLOW:
+                  amqpEventSink = (AmqpEventSink) 
protonEvent.getLink().getContext();
+                  amqpEventSink.processFlowUpdates(this);
+                  break;
+               case DELIVERY:
+                  amqpEventSink = (AmqpEventSink) 
protonEvent.getLink().getContext();
+                  amqpEventSink.processDeliveryUpdates(this);
+                  break;
+               default:
+                  break;
+            }
+
+            protonCollector.pop();
+         }
+
+         // We have to do this to pump SASL bytes in as SASL is not event 
driven yet.
+         if (!authenticated) {
+            processSaslAuthentication();
+         }
+      }
+      catch (Exception ex) {
+         LOG.warn("Caught Exception during update processing: {}", 
ex.getMessage(), ex);
+         fireClientException(ex);
+      }
+   }
+
+   private void processSaslAuthentication() {
+      if (authenticated || authenticator == null) {
+         return;
+      }
+
+      try {
+         if (authenticator.authenticate()) {
+            authenticator = null;
+            authenticated = true;
+         }
+      }
+      catch (SecurityException ex) {
+         failed(ex);
+      }
+   }
+
+   private String getNextSessionId() {
+      return connectionId + ":" + sessionIdGenerator.incrementAndGet();
+   }
+
+   private String safeGetContainerId() {
+      String containerId = getContainerId();
+      if (containerId == null || containerId.isEmpty()) {
+         containerId = UUID.randomUUID().toString();
+      }
+
+      return containerId;
+   }
+
+   @Override
+   public String toString() {
+      return "AmqpConnection { " + connectionId + " }";
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnectionListener.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnectionListener.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnectionListener.java
new file mode 100644
index 0000000..822170a
--- /dev/null
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnectionListener.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+/**
+ * Events points exposed by the AmqpClient object.
+ */
+public interface AmqpConnectionListener {
+
+   /**
+    * Indicates some error has occurred during client operations.
+    *
+    * @param ex The error that triggered this event.
+    */
+   void onException(Throwable ex);
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpDefaultConnectionListener.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpDefaultConnectionListener.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpDefaultConnectionListener.java
new file mode 100644
index 0000000..d2492e9
--- /dev/null
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpDefaultConnectionListener.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+/**
+ * Default listener implementation that stubs out all the event methods.
+ */
+public class AmqpDefaultConnectionListener implements AmqpConnectionListener {
+
+   @Override
+   public void onException(Throwable ex) {
+
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpEventSink.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpEventSink.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpEventSink.java
new file mode 100644
index 0000000..1c511a5
--- /dev/null
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpEventSink.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import java.io.IOException;
+
+/**
+ * Interface used by classes that want to process AMQP events sent from
+ * the transport layer.
+ */
+public interface AmqpEventSink {
+
+   /**
+    * Event handler for remote peer open of this resource.
+    *
+    * @param connection the AmqpConnection instance for easier access to fire 
events.
+    * @throws IOException if an error occurs while processing the update.
+    */
+   void processRemoteOpen(AmqpConnection connection) throws IOException;
+
+   /**
+    * Event handler for remote peer detach of this resource.
+    *
+    * @param connection the AmqpConnection instance for easier access to fire 
events.
+    * @throws IOException if an error occurs while processing the update.
+    */
+   void processRemoteDetach(AmqpConnection connection) throws IOException;
+
+   /**
+    * Event handler for remote peer close of this resource.
+    *
+    * @param connection the AmqpConnection instance for easier access to fire 
events.
+    * @throws IOException if an error occurs while processing the update.
+    */
+   void processRemoteClose(AmqpConnection connection) throws IOException;
+
+   /**
+    * Called when the Proton Engine signals an Delivery related event has been 
triggered
+    * for the given endpoint.
+    *
+    * @param connection the AmqpConnection instance for easier access to fire 
events.
+    * @throws IOException if an error occurs while processing the update.
+    */
+   void processDeliveryUpdates(AmqpConnection connection) throws IOException;
+
+   /**
+    * Called when the Proton Engine signals an Flow related event has been 
triggered
+    * for the given endpoint.
+    *
+    * @param connection the AmqpConnection instance for easier access to fire 
events.
+    * @throws IOException if an error occurs while processing the update.
+    */
+   void processFlowUpdates(AmqpConnection connection) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorFilter.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorFilter.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorFilter.java
new file mode 100644
index 0000000..adf5df6
--- /dev/null
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorFilter.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import org.apache.qpid.proton.amqp.DescribedType;
+
+import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_CODE;
+
+/**
+ * A Described Type wrapper for JMS selector values.
+ */
+public class AmqpJmsSelectorFilter implements DescribedType {
+
+   private final String selector;
+
+   public AmqpJmsSelectorFilter(String selector) {
+      this.selector = selector;
+   }
+
+   @Override
+   public Object getDescriptor() {
+      return JMS_SELECTOR_CODE;
+   }
+
+   @Override
+   public Object getDescribed() {
+      return this.selector;
+   }
+
+   @Override
+   public String toString() {
+      return "AmqpJmsSelectorType{" + selector + "}";
+   }
+}

Reply via email to