http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
new file mode 100644
index 0000000..28e38f2
--- /dev/null
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -0,0 +1,454 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.transport.amqp.client.util.AsyncResult;
+import org.apache.activemq.transport.amqp.client.util.ClientFuture;
+import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Session;
+
+/**
+ * Session class that manages a Proton session endpoint.
+ */
+public class AmqpSession extends AmqpAbstractResource<Session> {
+
+   private final AtomicLong receiverIdGenerator = new AtomicLong();
+   private final AtomicLong senderIdGenerator = new AtomicLong();
+
+   private final AmqpConnection connection;
+   private final String sessionId;
+   private final AmqpTransactionContext txContext;
+
+   /**
+    * Create a new session instance.
+    *
+    * @param connection The parent connection that created the session.
+    * @param sessionId  The unique ID value assigned to this session.
+    */
+   public AmqpSession(AmqpConnection connection, String sessionId) {
+      this.connection = connection;
+      this.sessionId = sessionId;
+      this.txContext = new AmqpTransactionContext(this);
+   }
+
+   /**
+    * Create a sender instance using the given address
+    *
+    * @param address the address to which the sender will produce its messages.
+    * @return a newly created sender that is ready for use.
+    * @throws Exception if an error occurs while creating the sender.
+    */
+   public AmqpSender createSender(final String address) throws Exception {
+      return createSender(address, false);
+   }
+
+   /**
+    * Create a sender instance using the given address
+    *
+    * @param address   the address to which the sender will produce its 
messages.
+    * @param presettle controls if the created sender produces message that 
have already been marked settled.
+    * @return a newly created sender that is ready for use.
+    * @throws Exception if an error occurs while creating the sender.
+    */
+   public AmqpSender createSender(final String address, boolean presettle) 
throws Exception {
+      checkClosed();
+
+      final AmqpSender sender = new AmqpSender(AmqpSession.this, address, 
getNextSenderId());
+      sender.setPresettle(presettle);
+      final ClientFuture request = new ClientFuture();
+
+      connection.getScheduler().execute(new Runnable() {
+
+         @Override
+         public void run() {
+            checkClosed();
+            sender.setStateInspector(getStateInspector());
+            sender.open(request);
+            pumpToProtonTransport(request);
+         }
+      });
+
+      request.sync();
+
+      return sender;
+   }
+
+   /**
+    * Create a sender instance using the given Target
+    *
+    * @param target the caller created and configured Traget used to create 
the sender link.
+    * @return a newly created sender that is ready for use.
+    * @throws Exception if an error occurs while creating the receiver.
+    */
+   public AmqpSender createSender(Target target) throws Exception {
+      checkClosed();
+
+      final AmqpSender sender = new AmqpSender(AmqpSession.this, target, 
getNextSenderId());
+      final ClientFuture request = new ClientFuture();
+
+      connection.getScheduler().execute(new Runnable() {
+
+         @Override
+         public void run() {
+            checkClosed();
+            sender.setStateInspector(getStateInspector());
+            sender.open(request);
+            pumpToProtonTransport(request);
+         }
+      });
+
+      request.sync();
+
+      return sender;
+   }
+
+   /**
+    * Create a receiver instance using the given address
+    *
+    * @param address the address to which the receiver will subscribe for its 
messages.
+    * @return a newly created receiver that is ready for use.
+    * @throws Exception if an error occurs while creating the receiver.
+    */
+   public AmqpReceiver createReceiver(String address) throws Exception {
+      return createReceiver(address, null, false);
+   }
+
+   /**
+    * Create a receiver instance using the given address
+    *
+    * @param address  the address to which the receiver will subscribe for its 
messages.
+    * @param selector the JMS selector to use for the subscription
+    * @return a newly created receiver that is ready for use.
+    * @throws Exception if an error occurs while creating the receiver.
+    */
+   public AmqpReceiver createReceiver(String address, String selector) throws 
Exception {
+      return createReceiver(address, selector, false);
+   }
+
+   /**
+    * Create a receiver instance using the given address
+    *
+    * @param address  the address to which the receiver will subscribe for its 
messages.
+    * @param selector the JMS selector to use for the subscription
+    * @param noLocal  should the subscription have messages from its 
connection filtered.
+    * @return a newly created receiver that is ready for use.
+    * @throws Exception if an error occurs while creating the receiver.
+    */
+   public AmqpReceiver createReceiver(String address, String selector, boolean 
noLocal) throws Exception {
+      return createReceiver(address, selector, noLocal, false);
+   }
+
+   /**
+    * Create a receiver instance using the given address
+    *
+    * @param address   the address to which the receiver will subscribe for 
its messages.
+    * @param selector  the JMS selector to use for the subscription
+    * @param noLocal   should the subscription have messages from its 
connection filtered.
+    * @param presettle should the receiver be created with a settled sender 
mode.
+    * @return a newly created receiver that is ready for use.
+    * @throws Exception if an error occurs while creating the receiver.
+    */
+   public AmqpReceiver createReceiver(String address,
+                                      String selector,
+                                      boolean noLocal,
+                                      boolean presettle) throws Exception {
+      checkClosed();
+
+      final ClientFuture request = new ClientFuture();
+      final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, 
address, getNextReceiverId());
+
+      receiver.setNoLocal(noLocal);
+      receiver.setPresettle(presettle);
+      if (selector != null && !selector.isEmpty()) {
+         receiver.setSelector(selector);
+      }
+
+      connection.getScheduler().execute(new Runnable() {
+
+         @Override
+         public void run() {
+            checkClosed();
+            receiver.setStateInspector(getStateInspector());
+            receiver.open(request);
+            pumpToProtonTransport(request);
+         }
+      });
+
+      request.sync();
+
+      return receiver;
+   }
+
+   /**
+    * Create a receiver instance using the given Source
+    *
+    * @param source the caller created and configured Source used to create 
the receiver link.
+    * @return a newly created receiver that is ready for use.
+    * @throws Exception if an error occurs while creating the receiver.
+    */
+   public AmqpReceiver createReceiver(Source source) throws Exception {
+      checkClosed();
+
+      final ClientFuture request = new ClientFuture();
+      final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, 
getNextReceiverId());
+
+      connection.getScheduler().execute(new Runnable() {
+
+         @Override
+         public void run() {
+            checkClosed();
+            receiver.setStateInspector(getStateInspector());
+            receiver.open(request);
+            pumpToProtonTransport(request);
+         }
+      });
+
+      request.sync();
+
+      return receiver;
+   }
+
+   /**
+    * Create a receiver instance using the given address that creates a 
durable subscription.
+    *
+    * @param address          the address to which the receiver will subscribe 
for its messages.
+    * @param subscriptionName the name of the subscription that is being 
created.
+    * @return a newly created receiver that is ready for use.
+    * @throws Exception if an error occurs while creating the receiver.
+    */
+   public AmqpReceiver createDurableReceiver(String address, String 
subscriptionName) throws Exception {
+      return createDurableReceiver(address, subscriptionName, null, false);
+   }
+
+   /**
+    * Create a receiver instance using the given address that creates a 
durable subscription.
+    *
+    * @param address          the address to which the receiver will subscribe 
for its messages.
+    * @param subscriptionName the name of the subscription that is being 
created.
+    * @param selector         the JMS selector to use for the subscription
+    * @return a newly created receiver that is ready for use.
+    * @throws Exception if an error occurs while creating the receiver.
+    */
+   public AmqpReceiver createDurableReceiver(String address,
+                                             String subscriptionName,
+                                             String selector) throws Exception 
{
+      return createDurableReceiver(address, subscriptionName, selector, false);
+   }
+
+   /**
+    * Create a receiver instance using the given address that creates a 
durable subscription.
+    *
+    * @param address          the address to which the receiver will subscribe 
for its messages.
+    * @param subscriptionName the name of the subscription that is being 
created.
+    * @param selector         the JMS selector to use for the subscription
+    * @param noLocal          should the subscription have messages from its 
connection filtered.
+    * @return a newly created receiver that is ready for use.
+    * @throws Exception if an error occurs while creating the receiver.
+    */
+   public AmqpReceiver createDurableReceiver(String address,
+                                             String subscriptionName,
+                                             String selector,
+                                             boolean noLocal) throws Exception 
{
+      checkClosed();
+
+      if (subscriptionName == null || subscriptionName.isEmpty()) {
+         throw new IllegalArgumentException("subscription name must not be 
null or empty.");
+      }
+
+      final ClientFuture request = new ClientFuture();
+      final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, 
address, getNextReceiverId());
+      receiver.setSubscriptionName(subscriptionName);
+      receiver.setNoLocal(noLocal);
+      if (selector != null && !selector.isEmpty()) {
+         receiver.setSelector(selector);
+      }
+
+      connection.getScheduler().execute(new Runnable() {
+
+         @Override
+         public void run() {
+            checkClosed();
+            receiver.setStateInspector(getStateInspector());
+            receiver.open(request);
+            pumpToProtonTransport(request);
+         }
+      });
+
+      request.sync();
+
+      return receiver;
+   }
+
+   /**
+    * Create a receiver instance using the given address that creates a 
durable subscription.
+    *
+    * @param subscriptionName the name of the subscription that should be 
queried for on the remote..
+    * @return a newly created receiver that is ready for use if the 
subscription exists.
+    * @throws Exception if an error occurs while creating the receiver.
+    */
+   public AmqpReceiver lookupSubscription(String subscriptionName) throws 
Exception {
+      checkClosed();
+
+      if (subscriptionName == null || subscriptionName.isEmpty()) {
+         throw new IllegalArgumentException("subscription name must not be 
null or empty.");
+      }
+
+      final ClientFuture request = new ClientFuture();
+      final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, 
(String) null, getNextReceiverId());
+      receiver.setSubscriptionName(subscriptionName);
+
+      connection.getScheduler().execute(new Runnable() {
+
+         @Override
+         public void run() {
+            checkClosed();
+            receiver.setStateInspector(getStateInspector());
+            receiver.open(request);
+            pumpToProtonTransport(request);
+         }
+      });
+
+      request.sync();
+
+      return receiver;
+   }
+
+   /**
+    * @return this session's parent AmqpConnection.
+    */
+   public AmqpConnection getConnection() {
+      return connection;
+   }
+
+   public Session getSession() {
+      return new UnmodifiableSession(getEndpoint());
+   }
+
+   public boolean isInTransaction() {
+      return txContext.isInTransaction();
+   }
+
+   @Override
+   public String toString() {
+      return "AmqpSession { " + sessionId + " }";
+   }
+
+   //----- Session Transaction Methods --------------------------------------//
+
+   /**
+    * Starts a new transaction associated with this session.
+    *
+    * @throws Exception if an error occurs starting a new Transaction.
+    */
+   public void begin() throws Exception {
+      if (txContext.isInTransaction()) {
+         throw new javax.jms.IllegalStateException("Session already has an 
active transaction");
+      }
+
+      txContext.begin();
+   }
+
+   /**
+    * Commit the current transaction associated with this session.
+    *
+    * @throws Exception if an error occurs committing the Transaction.
+    */
+   public void commit() throws Exception {
+      if (!txContext.isInTransaction()) {
+         throw new javax.jms.IllegalStateException("Commit called on Session 
that does not have an active transaction");
+      }
+
+      txContext.commit();
+   }
+
+   /**
+    * Roll back the current transaction associated with this session.
+    *
+    * @throws Exception if an error occurs rolling back the Transaction.
+    */
+   public void rollback() throws Exception {
+      if (!txContext.isInTransaction()) {
+         throw new javax.jms.IllegalStateException("Rollback called on Session 
that does not have an active transaction");
+      }
+
+      txContext.rollback();
+   }
+
+   //----- Internal access used to manage resources -------------------------//
+
+   ScheduledExecutorService getScheduler() {
+      return connection.getScheduler();
+   }
+
+   Connection getProtonConnection() {
+      return connection.getProtonConnection();
+   }
+
+   void pumpToProtonTransport(AsyncResult request) {
+      connection.pumpToProtonTransport(request);
+   }
+
+   AmqpTransactionId getTransactionId() {
+      return txContext.getTransactionId();
+   }
+
+   AmqpTransactionContext getTransactionContext() {
+      return txContext;
+   }
+
+   //----- Private implementation details -----------------------------------//
+
+   @Override
+   protected void doOpenInspection() {
+      try {
+         getStateInspector().inspectOpenedResource(getSession());
+      }
+      catch (Throwable error) {
+         getStateInspector().markAsInvalid(error.getMessage());
+      }
+   }
+
+   @Override
+   protected void doClosedInspection() {
+      try {
+         getStateInspector().inspectClosedResource(getSession());
+      }
+      catch (Throwable error) {
+         getStateInspector().markAsInvalid(error.getMessage());
+      }
+   }
+
+   private String getNextSenderId() {
+      return sessionId + ":" + senderIdGenerator.incrementAndGet();
+   }
+
+   private String getNextReceiverId() {
+      return sessionId + ":" + receiverIdGenerator.incrementAndGet();
+   }
+
+   private void checkClosed() {
+      if (isClosed() || connection.isClosed()) {
+         throw new IllegalStateException("Session is already closed");
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSupport.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSupport.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSupport.java
new file mode 100644
index 0000000..c9ee57b
--- /dev/null
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSupport.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import javax.jms.InvalidClientIDException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.JMSSecurityException;
+import javax.jms.ResourceAllocationException;
+import javax.jms.TransactionRolledBackException;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.transaction.TransactionErrors;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+import org.apache.qpid.proton.amqp.transport.ConnectionError;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+
+public class AmqpSupport {
+
+   // Symbols used for connection capabilities
+   public static final Symbol SOLE_CONNECTION_CAPABILITY = 
Symbol.valueOf("sole-connection-for-container");
+   public static final Symbol ANONYMOUS_RELAY = 
Symbol.valueOf("ANONYMOUS-RELAY");
+
+   // Symbols used to announce connection error information
+   public static final Symbol CONNECTION_OPEN_FAILED = 
Symbol.valueOf("amqp:connection-establishment-failed");
+   public static final Symbol INVALID_FIELD = Symbol.valueOf("invalid-field");
+   public static final Symbol CONTAINER_ID = Symbol.valueOf("container-id");
+
+   // Symbols used to announce connection redirect ErrorCondition 'info'
+   public static final Symbol PORT = Symbol.valueOf("port");
+   public static final Symbol NETWORK_HOST = Symbol.valueOf("network-host");
+   public static final Symbol OPEN_HOSTNAME = Symbol.valueOf("hostname");
+
+   // Symbols used for connection properties
+   public static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix");
+   public static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix");
+
+   public static final Symbol PRODUCT = Symbol.valueOf("product");
+   public static final Symbol VERSION = Symbol.valueOf("version");
+   public static final Symbol PLATFORM = Symbol.valueOf("platform");
+
+   // Symbols used for receivers.
+   public static final Symbol COPY = Symbol.getSymbol("copy");
+   public static final Symbol NO_LOCAL_SYMBOL = Symbol.valueOf("no-local");
+   public static final Symbol SELECTOR_SYMBOL = Symbol.valueOf("jms-selector");
+
+   // Delivery states
+   public static final Rejected REJECTED = new Rejected();
+   public static final Modified MODIFIED_FAILED = new Modified();
+   public static final Modified MODIFIED_FAILED_UNDELIVERABLE = new Modified();
+
+   // Temporary Destination constants
+   public static final Symbol DYNAMIC_NODE_LIFETIME_POLICY = 
Symbol.valueOf("lifetime-policy");
+   public static final String TEMP_QUEUE_CREATOR = "temp-queue-creator:";
+   public static final String TEMP_TOPIC_CREATOR = "temp-topic-creator:";
+
+   //----- Static initializer -----------------------------------------------//
+
+   static {
+      MODIFIED_FAILED.setDeliveryFailed(true);
+
+      MODIFIED_FAILED_UNDELIVERABLE.setDeliveryFailed(true);
+      MODIFIED_FAILED_UNDELIVERABLE.setUndeliverableHere(true);
+   }
+
+   //----- Utility Methods --------------------------------------------------//
+
+   /**
+    * Given an ErrorCondition instance create a new Exception that best matches
+    * the error type.
+    *
+    * @param errorCondition The ErrorCondition returned from the remote peer.
+    * @return a new Exception instance that best matches the ErrorCondition 
value.
+    */
+   public static Exception convertToException(ErrorCondition errorCondition) {
+      Exception remoteError = null;
+
+      if (errorCondition != null && errorCondition.getCondition() != null) {
+         Symbol error = errorCondition.getCondition();
+         String message = extractErrorMessage(errorCondition);
+
+         if (error.equals(AmqpError.UNAUTHORIZED_ACCESS)) {
+            remoteError = new JMSSecurityException(message);
+         }
+         else if (error.equals(AmqpError.RESOURCE_LIMIT_EXCEEDED)) {
+            remoteError = new ResourceAllocationException(message);
+         }
+         else if (error.equals(AmqpError.NOT_FOUND)) {
+            remoteError = new InvalidDestinationException(message);
+         }
+         else if (error.equals(TransactionErrors.TRANSACTION_ROLLBACK)) {
+            remoteError = new TransactionRolledBackException(message);
+         }
+         else if (error.equals(ConnectionError.REDIRECT)) {
+            remoteError = createRedirectException(error, message, 
errorCondition);
+         }
+         else if (error.equals(AmqpError.INVALID_FIELD)) {
+            Map<?, ?> info = errorCondition.getInfo();
+            if (info != null && CONTAINER_ID.equals(info.get(INVALID_FIELD))) {
+               remoteError = new InvalidClientIDException(message);
+            }
+            else {
+               remoteError = new JMSException(message);
+            }
+         }
+         else {
+            remoteError = new JMSException(message);
+         }
+      }
+      else {
+         remoteError = new JMSException("Unknown error from remote peer");
+      }
+
+      return remoteError;
+   }
+
+   /**
+    * Attempt to read and return the embedded error message in the given 
ErrorCondition
+    * object.  If no message can be extracted a generic message is returned.
+    *
+    * @param errorCondition The ErrorCondition to extract the error message 
from.
+    * @return an error message extracted from the given ErrorCondition.
+    */
+   public static String extractErrorMessage(ErrorCondition errorCondition) {
+      String message = "Received error from remote peer without description";
+      if (errorCondition != null) {
+         if (errorCondition.getDescription() != null && 
!errorCondition.getDescription().isEmpty()) {
+            message = errorCondition.getDescription();
+         }
+
+         Symbol condition = errorCondition.getCondition();
+         if (condition != null) {
+            message = message + " [condition = " + condition + "]";
+         }
+      }
+
+      return message;
+   }
+
+   /**
+    * When a redirect type exception is received this method is called to 
create the
+    * appropriate redirect exception type containing the error details needed.
+    *
+    * @param error     the Symbol that defines the redirection error type.
+    * @param message   the basic error message that should used or amended for 
the returned exception.
+    * @param condition the ErrorCondition that describes the redirection.
+    * @return an Exception that captures the details of the redirection error.
+    */
+   public static Exception createRedirectException(Symbol error, String 
message, ErrorCondition condition) {
+      Exception result = null;
+      Map<?, ?> info = condition.getInfo();
+
+      if (info == null) {
+         result = new IOException(message + " : Redirection information not 
set.");
+      }
+      else {
+         String hostname = (String) info.get(OPEN_HOSTNAME);
+
+         String networkHost = (String) info.get(NETWORK_HOST);
+         if (networkHost == null || networkHost.isEmpty()) {
+            result = new IOException(message + " : Redirection information not 
set.");
+         }
+
+         int port = 0;
+         try {
+            port = Integer.valueOf(info.get(PORT).toString());
+         }
+         catch (Exception ex) {
+            result = new IOException(message + " : Redirection information not 
set.");
+         }
+
+         result = new AmqpRedirectedException(message, hostname, networkHost, 
port);
+      }
+
+      return result;
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java
new file mode 100644
index 0000000..dcf23d2
--- /dev/null
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import java.io.IOException;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import org.apache.activemq.transport.amqp.client.util.AsyncResult;
+import org.apache.activemq.transport.amqp.client.util.ClientFuture;
+import 
org.apache.activemq.transport.amqp.client.util.ClientFutureSynchronization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Defines a context under which resources in a given session
+ * will operate inside transaction scoped boundaries.
+ */
+public class AmqpTransactionContext {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AmqpTransactionContext.class);
+
+   private final AmqpSession session;
+   private final Set<AmqpReceiver> txReceivers = new LinkedHashSet<>();
+
+   private AmqpTransactionCoordinator coordinator;
+   private AmqpTransactionId transactionId;
+
+   public AmqpTransactionContext(AmqpSession session) {
+      this.session = session;
+   }
+
+   /**
+    * Begins a new transaction scoped to the target session.
+    *
+    * @param txId The transaction Id to use for this new transaction.
+    * @throws Exception if an error occurs while starting the transaction.
+    */
+   public void begin() throws Exception {
+      if (transactionId != null) {
+         throw new IOException("Begin called while a TX is still Active.");
+      }
+
+      final AmqpTransactionId txId = 
session.getConnection().getNextTransactionId();
+      final ClientFuture request = new ClientFuture(new 
ClientFutureSynchronization() {
+
+         @Override
+         public void onPendingSuccess() {
+            transactionId = txId;
+         }
+
+         @Override
+         public void onPendingFailure(Throwable cause) {
+            transactionId = null;
+         }
+      });
+
+      LOG.info("Attempting to Begin TX:[{}]", txId);
+
+      session.getScheduler().execute(new Runnable() {
+
+         @Override
+         public void run() {
+            if (coordinator == null || coordinator.isClosed()) {
+               LOG.info("Creating new Coordinator for TX:[{}]", txId);
+               coordinator = new AmqpTransactionCoordinator(session);
+               coordinator.open(new AsyncResult() {
+
+                  @Override
+                  public void onSuccess() {
+                     try {
+                        LOG.info("Attempting to declare TX:[{}]", txId);
+                        coordinator.declare(txId, request);
+                     }
+                     catch (Exception e) {
+                        request.onFailure(e);
+                     }
+                  }
+
+                  @Override
+                  public void onFailure(Throwable result) {
+                     request.onFailure(result);
+                  }
+
+                  @Override
+                  public boolean isComplete() {
+                     return request.isComplete();
+                  }
+               });
+            }
+            else {
+               try {
+                  LOG.info("Attempting to declare TX:[{}]", txId);
+                  coordinator.declare(txId, request);
+               }
+               catch (Exception e) {
+                  request.onFailure(e);
+               }
+            }
+
+            session.pumpToProtonTransport(request);
+         }
+      });
+
+      request.sync();
+   }
+
+   /**
+    * Commit this transaction which then ends the lifetime of the transacted 
operation.
+    *
+    * @throws Exception if an error occurs while performing the commit
+    */
+   public void commit() throws Exception {
+      if (transactionId == null) {
+         throw new IllegalStateException("Commit called with no active 
Transaction.");
+      }
+
+      preCommit();
+
+      final ClientFuture request = new ClientFuture(new 
ClientFutureSynchronization() {
+
+         @Override
+         public void onPendingSuccess() {
+            transactionId = null;
+            postCommit();
+         }
+
+         @Override
+         public void onPendingFailure(Throwable cause) {
+            transactionId = null;
+            postCommit();
+         }
+      });
+
+      LOG.debug("Commit on TX[{}] initiated", transactionId);
+      session.getScheduler().execute(new Runnable() {
+
+         @Override
+         public void run() {
+            try {
+               LOG.info("Attempting to commit TX:[{}]", transactionId);
+               coordinator.discharge(transactionId, request, true);
+               session.pumpToProtonTransport(request);
+            }
+            catch (Exception e) {
+               request.onFailure(e);
+            }
+         }
+      });
+
+      request.sync();
+   }
+
+   /**
+    * Rollback any transacted work performed under the current transaction.
+    *
+    * @throws Exception if an error occurs during the rollback operation.
+    */
+   public void rollback() throws Exception {
+      if (transactionId == null) {
+         throw new IllegalStateException("Rollback called with no active 
Transaction.");
+      }
+
+      preRollback();
+
+      final ClientFuture request = new ClientFuture(new 
ClientFutureSynchronization() {
+
+         @Override
+         public void onPendingSuccess() {
+            transactionId = null;
+            postRollback();
+         }
+
+         @Override
+         public void onPendingFailure(Throwable cause) {
+            transactionId = null;
+            postRollback();
+         }
+      });
+
+      LOG.debug("Rollback on TX[{}] initiated", transactionId);
+      session.getScheduler().execute(new Runnable() {
+
+         @Override
+         public void run() {
+            try {
+               LOG.info("Attempting to roll back TX:[{}]", transactionId);
+               coordinator.discharge(transactionId, request, false);
+               session.pumpToProtonTransport(request);
+            }
+            catch (Exception e) {
+               request.onFailure(e);
+            }
+         }
+      });
+
+      request.sync();
+   }
+
+   //----- Internal access to context properties ----------------------------//
+
+   AmqpTransactionCoordinator getCoordinator() {
+      return coordinator;
+   }
+
+   AmqpTransactionId getTransactionId() {
+      return transactionId;
+   }
+
+   boolean isInTransaction() {
+      return transactionId != null;
+   }
+
+   void registerTxConsumer(AmqpReceiver consumer) {
+      txReceivers.add(consumer);
+   }
+
+   //----- Transaction pre / post completion --------------------------------//
+
+   private void preCommit() {
+      for (AmqpReceiver receiver : txReceivers) {
+         receiver.preCommit();
+      }
+   }
+
+   private void preRollback() {
+      for (AmqpReceiver receiver : txReceivers) {
+         receiver.preRollback();
+      }
+   }
+
+   private void postCommit() {
+      for (AmqpReceiver receiver : txReceivers) {
+         receiver.postCommit();
+      }
+
+      txReceivers.clear();
+   }
+
+   private void postRollback() {
+      for (AmqpReceiver receiver : txReceivers) {
+         receiver.postRollback();
+      }
+
+      txReceivers.clear();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionCoordinator.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionCoordinator.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionCoordinator.java
new file mode 100644
index 0000000..aded162
--- /dev/null
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionCoordinator.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.TransactionRolledBackException;
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.activemq.transport.amqp.client.util.AsyncResult;
+import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.transaction.Coordinator;
+import org.apache.qpid.proton.amqp.transaction.Declare;
+import org.apache.qpid.proton.amqp.transaction.Declared;
+import org.apache.qpid.proton.amqp.transaction.Discharge;
+import org.apache.qpid.proton.amqp.transaction.TxnCapability;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.message.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents the AMQP Transaction coordinator link used by the transaction 
context
+ * of a session to control the lifetime of a given transaction.
+ */
+public class AmqpTransactionCoordinator extends AmqpAbstractResource<Sender> {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AmqpTransactionCoordinator.class);
+
+   private final byte[] OUTBOUND_BUFFER = new byte[64];
+
+   private final AmqpSession session;
+   private final AmqpTransferTagGenerator tagGenerator = new 
AmqpTransferTagGenerator();
+
+   private List<Delivery> pendingDeliveries = new LinkedList<>();
+   private Map<AmqpTransactionId, AsyncResult> pendingRequests = new 
HashMap<>();
+
+   public AmqpTransactionCoordinator(AmqpSession session) {
+      this.session = session;
+   }
+
+   @Override
+   public void processDeliveryUpdates(AmqpConnection connection) throws 
IOException {
+      try {
+         Iterator<Delivery> deliveries = pendingDeliveries.iterator();
+         while (deliveries.hasNext()) {
+            Delivery pendingDelivery = deliveries.next();
+            if (!pendingDelivery.remotelySettled()) {
+               continue;
+            }
+
+            DeliveryState state = pendingDelivery.getRemoteState();
+            AmqpTransactionId txId = (AmqpTransactionId) 
pendingDelivery.getContext();
+            AsyncResult pendingRequest = pendingRequests.get(txId);
+
+            if (pendingRequest == null) {
+               throw new IllegalStateException("Pending tx operation with no 
pending request");
+            }
+
+            if (state instanceof Declared) {
+               LOG.debug("New TX started: {}", txId.getTxId());
+               Declared declared = (Declared) state;
+               txId.setRemoteTxId(declared.getTxnId());
+               pendingRequest.onSuccess();
+            }
+            else if (state instanceof Rejected) {
+               LOG.debug("Last TX request failed: {}", txId.getTxId());
+               Rejected rejected = (Rejected) state;
+               Exception cause = 
AmqpSupport.convertToException(rejected.getError());
+               JMSException failureCause = null;
+               if (txId.isCommit()) {
+                  failureCause = new 
TransactionRolledBackException(cause.getMessage());
+               }
+               else {
+                  failureCause = new JMSException(cause.getMessage());
+               }
+
+               pendingRequest.onFailure(failureCause);
+            }
+            else {
+               LOG.debug("Last TX request succeeded: {}", txId.getTxId());
+               pendingRequest.onSuccess();
+            }
+
+            // Clear state data
+            pendingDelivery.settle();
+            pendingRequests.remove(txId);
+            deliveries.remove();
+         }
+
+         super.processDeliveryUpdates(connection);
+      }
+      catch (Exception e) {
+         throw IOExceptionSupport.create(e);
+      }
+   }
+
+   public void declare(AmqpTransactionId txId, AsyncResult request) throws 
Exception {
+      if (txId.getRemoteTxId() != null) {
+         throw new IllegalStateException("Declar called while a TX is still 
Active.");
+      }
+
+      if (isClosed()) {
+         request.onFailure(new JMSException("Cannot start new transaction: 
Coordinator remotely closed"));
+         return;
+      }
+
+      Message message = Message.Factory.create();
+      Declare declare = new Declare();
+      message.setBody(new AmqpValue(declare));
+
+      Delivery pendingDelivery = 
getEndpoint().delivery(tagGenerator.getNextTag());
+      pendingDelivery.setContext(txId);
+
+      // Store away for completion
+      pendingDeliveries.add(pendingDelivery);
+      pendingRequests.put(txId, request);
+
+      sendTxCommand(message);
+   }
+
+   public void discharge(AmqpTransactionId txId, AsyncResult request, boolean 
commit) throws Exception {
+
+      if (isClosed()) {
+         Exception failureCause = null;
+
+         if (commit) {
+            failureCause = new TransactionRolledBackException("Transaction 
inbout: Coordinator remotely closed");
+         }
+         else {
+            failureCause = new JMSException("Rollback cannot complete: 
Coordinator remotely closed");
+         }
+
+         request.onFailure(failureCause);
+         return;
+      }
+
+      // Store the context of this action in the transaction ID for later 
completion.
+      txId.setState(commit ? AmqpTransactionId.COMMIT_MARKER : 
AmqpTransactionId.ROLLBACK_MARKER);
+
+      Message message = Message.Factory.create();
+      Discharge discharge = new Discharge();
+      discharge.setFail(!commit);
+      discharge.setTxnId(txId.getRemoteTxId());
+      message.setBody(new AmqpValue(discharge));
+
+      Delivery pendingDelivery = 
getEndpoint().delivery(tagGenerator.getNextTag());
+      pendingDelivery.setContext(txId);
+
+      // Store away for completion
+      pendingDeliveries.add(pendingDelivery);
+      pendingRequests.put(txId, request);
+
+      sendTxCommand(message);
+   }
+
+   //----- Base class overrides ---------------------------------------------//
+
+   @Override
+   public void remotelyClosed(AmqpConnection connection) {
+
+      Exception txnError = 
AmqpSupport.convertToException(getEndpoint().getRemoteCondition());
+
+      // Alert any pending operation that the link failed to complete the 
pending
+      // begin / commit / rollback operation.
+      for (AsyncResult pendingRequest : pendingRequests.values()) {
+         pendingRequest.onFailure(txnError);
+      }
+
+      // Purge linkages to pending operations.
+      pendingDeliveries.clear();
+      pendingRequests.clear();
+
+      // Override the base class version because we do not want to propagate
+      // an error up to the client if remote close happens as that is an
+      // acceptable way for the remote to indicate the discharge could not
+      // be applied.
+
+      if (getEndpoint() != null) {
+         getEndpoint().close();
+         getEndpoint().free();
+      }
+
+      LOG.debug("Transaction Coordinator link {} was remotely closed", 
getEndpoint());
+   }
+
+   //----- Internal implementation ------------------------------------------//
+
+   private void sendTxCommand(Message message) throws IOException {
+      int encodedSize = 0;
+      byte[] buffer = OUTBOUND_BUFFER;
+      while (true) {
+         try {
+            encodedSize = message.encode(buffer, 0, buffer.length);
+            break;
+         }
+         catch (BufferOverflowException e) {
+            buffer = new byte[buffer.length * 2];
+         }
+      }
+
+      Sender sender = getEndpoint();
+      sender.send(buffer, 0, encodedSize);
+      sender.advance();
+   }
+
+   @Override
+   protected void doOpen() {
+      Coordinator coordinator = new Coordinator();
+      coordinator.setCapabilities(TxnCapability.LOCAL_TXN);
+      Source source = new Source();
+
+      String coordinatorName = "qpid-jms:coordinator:" + 
session.getConnection().getConnectionId();
+
+      Sender sender = session.getEndpoint().sender(coordinatorName);
+      sender.setSource(source);
+      sender.setTarget(coordinator);
+      sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+      sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+      setEndpoint(sender);
+
+      super.doOpen();
+   }
+
+   @Override
+   protected void doOpenInspection() {
+      // TODO
+   }
+
+   @Override
+   protected void doClosedInspection() {
+      // TODO
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionId.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionId.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionId.java
new file mode 100644
index 0000000..5dcdfe1
--- /dev/null
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionId.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import org.apache.qpid.proton.amqp.Binary;
+
+/**
+ * Wrapper For Transaction state in identification
+ */
+public class AmqpTransactionId {
+
+   public static final int DECLARE_MARKER = 1;
+   public static final int ROLLBACK_MARKER = 2;
+   public static final int COMMIT_MARKER = 3;
+
+   private final String txId;
+   private Binary remoteTxId;
+   private int state = DECLARE_MARKER;
+
+   public AmqpTransactionId(String txId) {
+      this.txId = txId;
+   }
+
+   public boolean isDeclare() {
+      return state == DECLARE_MARKER;
+   }
+
+   public boolean isCommit() {
+      return state == COMMIT_MARKER;
+   }
+
+   public boolean isRollback() {
+      return state == ROLLBACK_MARKER;
+   }
+
+   public void setState(int state) {
+      this.state = state;
+   }
+
+   public String getTxId() {
+      return txId;
+   }
+
+   public Binary getRemoteTxId() {
+      return remoteTxId;
+   }
+
+   public void setRemoteTxId(Binary remoteTxId) {
+      this.remoteTxId = remoteTxId;
+   }
+
+   @Override
+   public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((txId == null) ? 0 : txId.hashCode());
+      return result;
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+      if (this == obj) {
+         return true;
+      }
+      if (obj == null) {
+         return false;
+      }
+      if (getClass() != obj.getClass()) {
+         return false;
+      }
+
+      AmqpTransactionId other = (AmqpTransactionId) obj;
+      if (txId == null) {
+         if (other.txId != null) {
+            return false;
+         }
+      }
+      else if (!txId.equals(other.txId)) {
+         return false;
+      }
+
+      return true;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransferTagGenerator.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransferTagGenerator.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransferTagGenerator.java
new file mode 100644
index 0000000..85ee07f
--- /dev/null
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransferTagGenerator.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/**
+ * Utility class that can generate and if enabled pool the binary tag values
+ * used to identify transfers over an AMQP link.
+ */
+public final class AmqpTransferTagGenerator {
+
+   public static final int DEFAULT_TAG_POOL_SIZE = 1024;
+
+   private long nextTagId;
+   private int maxPoolSize = DEFAULT_TAG_POOL_SIZE;
+
+   private final Set<byte[]> tagPool;
+
+   public AmqpTransferTagGenerator() {
+      this(false);
+   }
+
+   public AmqpTransferTagGenerator(boolean pool) {
+      if (pool) {
+         this.tagPool = new LinkedHashSet<>();
+      }
+      else {
+         this.tagPool = null;
+      }
+   }
+
+   /**
+    * Retrieves the next available tag.
+    *
+    * @return a new or unused tag depending on the pool option.
+    */
+   public byte[] getNextTag() {
+      byte[] rc;
+      if (tagPool != null && !tagPool.isEmpty()) {
+         final Iterator<byte[]> iterator = tagPool.iterator();
+         rc = iterator.next();
+         iterator.remove();
+      }
+      else {
+         try {
+            rc = Long.toHexString(nextTagId++).getBytes("UTF-8");
+         }
+         catch (UnsupportedEncodingException e) {
+            // This should never happen since we control the input.
+            throw new RuntimeException(e);
+         }
+      }
+      return rc;
+   }
+
+   /**
+    * When used as a pooled cache of tags the unused tags should always be 
returned once
+    * the transfer has been settled.
+    *
+    * @param data a previously borrowed tag that is no longer in use.
+    */
+   public void returnTag(byte[] data) {
+      if (tagPool != null && tagPool.size() < maxPoolSize) {
+         tagPool.add(data);
+      }
+   }
+
+   /**
+    * Gets the current max pool size value.
+    *
+    * @return the current max tag pool size.
+    */
+   public int getMaxPoolSize() {
+      return maxPoolSize;
+   }
+
+   /**
+    * Sets the max tag pool size.  If the size is smaller than the current 
number
+    * of pooled tags the pool will drain over time until it matches the max.
+    *
+    * @param maxPoolSize the maximum number of tags to hold in the pool.
+    */
+   public void setMaxPoolSize(int maxPoolSize) {
+      this.maxPoolSize = maxPoolSize;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpUnknownFilterType.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpUnknownFilterType.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpUnknownFilterType.java
new file mode 100644
index 0000000..8a4ce6b
--- /dev/null
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpUnknownFilterType.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+
+/**
+ * A Described Type wrapper for an unsupported filter that the broker should 
ignore.
+ */
+public class AmqpUnknownFilterType implements DescribedType {
+
+   public static final AmqpUnknownFilterType UNKOWN_FILTER = new 
AmqpUnknownFilterType();
+
+   public static final UnsignedLong UNKNOWN_FILTER_CODE = 
UnsignedLong.valueOf(0x0000468C00000099L);
+   public static final Symbol UNKNOWN_FILTER_NAME = 
Symbol.valueOf("apache.org:unkown-filter:string");
+   public static final Object[] UNKNOWN_FILTER_IDS = new 
Object[]{UNKNOWN_FILTER_CODE, UNKNOWN_FILTER_NAME};
+
+   private final String payload;
+
+   public AmqpUnknownFilterType() {
+      this.payload = "UnknownFilter{}";
+   }
+
+   @Override
+   public Object getDescriptor() {
+      return UNKNOWN_FILTER_CODE;
+   }
+
+   @Override
+   public Object getDescribed() {
+      return this.payload;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java
new file mode 100644
index 0000000..5f46cb6
--- /dev/null
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+
+/**
+ * Abstract base for a validation hook that is used in tests to check
+ * the state of a remote resource after a variety of lifecycle events.
+ */
+public class AmqpValidator {
+
+   private boolean valid = true;
+   private String errorMessage;
+
+   public void inspectOpenedResource(Connection connection) {
+
+   }
+
+   public void inspectOpenedResource(Session session) {
+
+   }
+
+   public void inspectOpenedResource(Sender sender) {
+
+   }
+
+   public void inspectOpenedResource(Receiver receiver) {
+
+   }
+
+   public void inspectClosedResource(Connection remoteConnection) {
+
+   }
+
+   public void inspectClosedResource(Session session) {
+
+   }
+
+   public void inspectClosedResource(Sender sender) {
+
+   }
+
+   public void inspectClosedResource(Receiver receiver) {
+
+   }
+
+   public void inspectDetachedResource(Sender sender) {
+
+   }
+
+   public void inspectDetachedResource(Receiver receiver) {
+
+   }
+
+   public boolean isValid() {
+      return valid;
+   }
+
+   protected void setValid(boolean valid) {
+      this.valid = valid;
+   }
+
+   public String getErrorMessage() {
+      return errorMessage;
+   }
+
+   protected void setErrorMessage(String errorMessage) {
+      this.errorMessage = errorMessage;
+   }
+
+   protected void markAsInvalid(String errorMessage) {
+      if (valid) {
+         setValid(false);
+         setErrorMessage(errorMessage);
+      }
+   }
+
+   public void assertValid() {
+      if (!isValid()) {
+         throw new AssertionError(errorMessage);
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/AbstractMechanism.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/AbstractMechanism.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/AbstractMechanism.java
new file mode 100644
index 0000000..011fba7
--- /dev/null
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/AbstractMechanism.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.sasl;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Base class for SASL Authentication Mechanism that implements the basic
+ * methods of a Mechanism class.
+ */
+public abstract class AbstractMechanism implements Mechanism {
+
+   protected static final byte[] EMPTY = new byte[0];
+
+   private String username;
+   private String password;
+   private String authzid;
+   private Map<String, Object> properties = new HashMap<>();
+
+   @Override
+   public int compareTo(Mechanism other) {
+
+      if (getPriority() < other.getPriority()) {
+         return -1;
+      }
+      else if (getPriority() > other.getPriority()) {
+         return 1;
+      }
+
+      return 0;
+   }
+
+   @Override
+   public void setUsername(String value) {
+      this.username = value;
+   }
+
+   @Override
+   public String getUsername() {
+      return username;
+   }
+
+   @Override
+   public void setPassword(String value) {
+      this.password = value;
+   }
+
+   @Override
+   public String getPassword() {
+      return this.password;
+   }
+
+   @Override
+   public void setProperties(Map<String, Object> properties) {
+      this.properties = properties;
+   }
+
+   @Override
+   public Map<String, Object> getProperties() {
+      return this.properties;
+   }
+
+   @Override
+   public String toString() {
+      return "SASL-" + getName();
+   }
+
+   @Override
+   public String getAuthzid() {
+      return authzid;
+   }
+
+   @Override
+   public void setAuthzid(String authzid) {
+      this.authzid = authzid;
+   }
+
+   @Override
+   public boolean isApplicable(String username, String password) {
+      return true;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/AnonymousMechanism.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/AnonymousMechanism.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/AnonymousMechanism.java
new file mode 100644
index 0000000..c3d36aa
--- /dev/null
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/AnonymousMechanism.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.sasl;
+
+/**
+ * Implements the Anonymous SASL authentication mechanism.
+ */
+public class AnonymousMechanism extends AbstractMechanism {
+
+   @Override
+   public byte[] getInitialResponse() {
+      return EMPTY;
+   }
+
+   @Override
+   public byte[] getChallengeResponse(byte[] challenge) {
+      return EMPTY;
+   }
+
+   @Override
+   public int getPriority() {
+      return PRIORITY.LOWEST.getValue();
+   }
+
+   @Override
+   public String getName() {
+      return "ANONYMOUS";
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/CramMD5Mechanism.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/CramMD5Mechanism.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/CramMD5Mechanism.java
new file mode 100644
index 0000000..4821314
--- /dev/null
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/CramMD5Mechanism.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.sasl;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+import javax.security.sasl.SaslException;
+import java.io.UnsupportedEncodingException;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+
+/**
+ * Implements the SASL PLAIN authentication Mechanism.
+ *
+ * User name and Password values are sent without being encrypted.
+ */
+public class CramMD5Mechanism extends AbstractMechanism {
+
+   private static final String ASCII = "ASCII";
+   private static final String HMACMD5 = "HMACMD5";
+   private boolean sentResponse;
+
+   @Override
+   public int getPriority() {
+      return PRIORITY.HIGH.getValue();
+   }
+
+   @Override
+   public String getName() {
+      return "CRAM-MD5";
+   }
+
+   @Override
+   public byte[] getInitialResponse() {
+      return EMPTY;
+   }
+
+   @Override
+   public byte[] getChallengeResponse(byte[] challenge) throws SaslException {
+      if (!sentResponse && challenge != null && challenge.length != 0) {
+         try {
+            SecretKeySpec key = new 
SecretKeySpec(getPassword().getBytes(ASCII), HMACMD5);
+            Mac mac = Mac.getInstance(HMACMD5);
+            mac.init(key);
+
+            byte[] bytes = mac.doFinal(challenge);
+
+            StringBuffer hash = new StringBuffer(getUsername());
+            hash.append(' ');
+            for (int i = 0; i < bytes.length; i++) {
+               String hex = Integer.toHexString(0xFF & bytes[i]);
+               if (hex.length() == 1) {
+                  hash.append('0');
+               }
+               hash.append(hex);
+            }
+
+            sentResponse = true;
+            return hash.toString().getBytes(ASCII);
+         }
+         catch (UnsupportedEncodingException e) {
+            throw new SaslException("Unable to utilise required encoding", e);
+         }
+         catch (InvalidKeyException e) {
+            throw new SaslException("Unable to utilise key", e);
+         }
+         catch (NoSuchAlgorithmException e) {
+            throw new SaslException("Unable to utilise required algorithm", e);
+         }
+      }
+      else {
+         return EMPTY;
+      }
+   }
+
+   @Override
+   public boolean isApplicable(String username, String password) {
+      return username != null && username.length() > 0 && password != null && 
password.length() > 0;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/Mechanism.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/Mechanism.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/Mechanism.java
new file mode 100644
index 0000000..a79406f
--- /dev/null
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/Mechanism.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.sasl;
+
+import javax.security.sasl.SaslException;
+import java.util.Map;
+
+/**
+ * Interface for all SASL authentication mechanism implementations.
+ */
+public interface Mechanism extends Comparable<Mechanism> {
+
+   /**
+    * Relative priority values used to arrange the found SASL
+    * mechanisms in a preferred order where the level of security
+    * generally defines the preference.
+    */
+   enum PRIORITY {
+      LOWEST(0),
+      LOW(1),
+      MEDIUM(2),
+      HIGH(3),
+      HIGHEST(4);
+
+      private final int value;
+
+      PRIORITY(int value) {
+         this.value = value;
+      }
+
+      public int getValue() {
+         return value;
+      }
+   };
+
+   /**
+    * @return return the relative priority of this SASL mechanism.
+    */
+   int getPriority();
+
+   /**
+    * @return the well known name of this SASL mechanism.
+    */
+   String getName();
+
+   /**
+    * @return the response buffer used to answer the initial SASL cycle.
+    * @throws SaslException if an error occurs computing the response.
+    */
+   byte[] getInitialResponse() throws SaslException;
+
+   /**
+    * Create a response based on a given challenge from the remote peer.
+    *
+    * @param challenge the challenge that this Mechanism should response to.
+    * @return the response that answers the given challenge.
+    * @throws SaslException if an error occurs computing the response.
+    */
+   byte[] getChallengeResponse(byte[] challenge) throws SaslException;
+
+   /**
+    * Sets the user name value for this Mechanism.  The Mechanism can ignore 
this
+    * value if it does not utilize user name in it's authentication processing.
+    *
+    * @param username The user name given.
+    */
+   void setUsername(String value);
+
+   /**
+    * Returns the configured user name value for this Mechanism.
+    *
+    * @return the currently set user name value for this Mechanism.
+    */
+   String getUsername();
+
+   /**
+    * Sets the password value for this Mechanism.  The Mechanism can ignore 
this
+    * value if it does not utilize a password in it's authentication 
processing.
+    *
+    * @param username The user name given.
+    */
+   void setPassword(String value);
+
+   /**
+    * Returns the configured password value for this Mechanism.
+    *
+    * @return the currently set password value for this Mechanism.
+    */
+   String getPassword();
+
+   /**
+    * Sets any additional Mechanism specific properties using a Map<String, 
Object>
+    *
+    * @param options the map of additional properties that this Mechanism 
should utilize.
+    */
+   void setProperties(Map<String, Object> options);
+
+   /**
+    * The currently set Properties for this Mechanism.
+    *
+    * @return the current set of configuration Properties for this Mechanism.
+    */
+   Map<String, Object> getProperties();
+
+   /**
+    * Using the configured credentials, check if the mechanism applies or not.
+    *
+    * @param username The user name that will be used with this mechanism
+    * @param password The password that will be used with this mechanism
+    * @return true if the mechanism works with the provided credentials or not.
+    */
+   boolean isApplicable(String username, String password);
+
+   /**
+    * Get the currently configured Authentication ID.
+    *
+    * @return the currently set Authentication ID.
+    */
+   String getAuthzid();
+
+   /**
+    * Sets an Authentication ID that some mechanism can use during the
+    * challenge response phase.
+    *
+    * @param authzid The Authentication ID to use.
+    */
+   void setAuthzid(String authzid);
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/PlainMechanism.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/PlainMechanism.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/PlainMechanism.java
new file mode 100644
index 0000000..d9b3ba3
--- /dev/null
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/PlainMechanism.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.sasl;
+
+/**
+ * Implements the SASL PLAIN authentication Mechanism.
+ *
+ * User name and Password values are sent without being encrypted.
+ */
+public class PlainMechanism extends AbstractMechanism {
+
+   public static final String MECH_NAME = "PLAIN";
+
+   @Override
+   public int getPriority() {
+      return PRIORITY.MEDIUM.getValue();
+   }
+
+   @Override
+   public String getName() {
+      return MECH_NAME;
+   }
+
+   @Override
+   public byte[] getInitialResponse() {
+
+      String authzid = getAuthzid();
+      String username = getUsername();
+      String password = getPassword();
+
+      if (authzid == null) {
+         authzid = "";
+      }
+
+      if (username == null) {
+         username = "";
+      }
+
+      if (password == null) {
+         password = "";
+      }
+
+      byte[] authzidBytes = authzid.getBytes();
+      byte[] usernameBytes = username.getBytes();
+      byte[] passwordBytes = password.getBytes();
+      byte[] data = new byte[authzidBytes.length + 1 + usernameBytes.length + 
1 + passwordBytes.length];
+      System.arraycopy(authzidBytes, 0, data, 0, authzidBytes.length);
+      System.arraycopy(usernameBytes, 0, data, 1 + authzidBytes.length, 
usernameBytes.length);
+      System.arraycopy(passwordBytes, 0, data, 2 + authzidBytes.length + 
usernameBytes.length, passwordBytes.length);
+      return data;
+   }
+
+   @Override
+   public byte[] getChallengeResponse(byte[] challenge) {
+      return EMPTY;
+   }
+
+   @Override
+   public boolean isApplicable(String username, String password) {
+      return username != null && username.length() > 0 && password != null && 
password.length() > 0;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/SaslAuthenticator.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/SaslAuthenticator.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/SaslAuthenticator.java
new file mode 100644
index 0000000..5c25fae
--- /dev/null
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/SaslAuthenticator.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.sasl;
+
+import javax.security.sasl.SaslException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.qpid.proton.engine.Sasl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manage the SASL authentication process
+ */
+public class SaslAuthenticator {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(SaslAuthenticator.class);
+
+   private final Sasl sasl;
+   private final String username;
+   private final String password;
+   private final String authzid;
+   private Mechanism mechanism;
+   private String mechanismRestriction;
+
+   /**
+    * Create the authenticator and initialize it.
+    *
+    * @param sasl                 The Proton SASL entry point this class will 
use to manage the authentication.
+    * @param username             The user name that will be used to 
authenticate.
+    * @param password             The password that will be used to 
authenticate.
+    * @param authzid              The authzid used when authenticating 
(currently only with PLAIN)
+    * @param mechanismRestriction A particular mechanism to use (if offered by 
the server) or null to allow selection.
+    */
+   public SaslAuthenticator(Sasl sasl, String username, String password, 
String authzid, String mechanismRestriction) {
+      this.sasl = sasl;
+      this.username = username;
+      this.password = password;
+      this.authzid = authzid;
+      this.mechanismRestriction = mechanismRestriction;
+   }
+
+   /**
+    * Process the SASL authentication cycle until such time as an outcome is 
determine. This
+    * method must be called by the managing entity until the return value is 
true indicating a
+    * successful authentication or a JMSSecurityException is thrown indicating 
that the
+    * handshake failed.
+    *
+    * @throws SecurityException
+    */
+   public boolean authenticate() throws SecurityException {
+      switch (sasl.getState()) {
+         case PN_SASL_IDLE:
+            handleSaslInit();
+            break;
+         case PN_SASL_STEP:
+            handleSaslStep();
+            break;
+         case PN_SASL_FAIL:
+            handleSaslFail();
+            break;
+         case PN_SASL_PASS:
+            return true;
+         default:
+      }
+
+      return false;
+   }
+
+   private void handleSaslInit() throws SecurityException {
+      try {
+         String[] remoteMechanisms = sasl.getRemoteMechanisms();
+         if (remoteMechanisms != null && remoteMechanisms.length != 0) {
+            mechanism = findMatchingMechanism(remoteMechanisms);
+            if (mechanism != null) {
+               mechanism.setUsername(username);
+               mechanism.setPassword(password);
+               mechanism.setAuthzid(authzid);
+               // TODO - set additional options from URI.
+               // TODO - set a host value.
+
+               sasl.setMechanisms(mechanism.getName());
+               byte[] response = mechanism.getInitialResponse();
+               if (response != null && response.length != 0) {
+                  sasl.send(response, 0, response.length);
+               }
+            }
+            else {
+               // TODO - Better error message.
+               throw new SecurityException("Could not find a matching SASL 
mechanism for the remote peer.");
+            }
+         }
+      }
+      catch (SaslException se) {
+         // TODO - Better error message.
+         SecurityException jmsse = new SecurityException("Exception while 
processing SASL init.");
+         jmsse.initCause(se);
+         throw jmsse;
+      }
+   }
+
+   private Mechanism findMatchingMechanism(String... remoteMechanisms) {
+
+      Mechanism match = null;
+      List<Mechanism> found = new ArrayList<>();
+
+      for (String remoteMechanism : remoteMechanisms) {
+         if (mechanismRestriction != null && 
!mechanismRestriction.equals(remoteMechanism)) {
+            LOG.debug("Skipping {} mechanism because it is not the configured 
mechanism restriction {}", remoteMechanism, mechanismRestriction);
+            continue;
+         }
+
+         Mechanism mechanism = null;
+         if (remoteMechanism.equalsIgnoreCase("PLAIN")) {
+            mechanism = new PlainMechanism();
+         }
+         else if (remoteMechanism.equalsIgnoreCase("ANONYMOUS")) {
+            mechanism = new AnonymousMechanism();
+         }
+         else if (remoteMechanism.equalsIgnoreCase("CRAM-MD5")) {
+            mechanism = new CramMD5Mechanism();
+         }
+         else {
+            LOG.debug("Unknown remote mechanism {}, skipping", 
remoteMechanism);
+            continue;
+         }
+
+         if (mechanism.isApplicable(username, password)) {
+            found.add(mechanism);
+         }
+      }
+
+      if (!found.isEmpty()) {
+         // Sorts by priority using Mechanism comparison and return the last 
value in
+         // list which is the Mechanism deemed to be the highest priority 
match.
+         Collections.sort(found);
+         match = found.get(found.size() - 1);
+      }
+
+      LOG.info("Best match for SASL auth was: {}", match);
+
+      return match;
+   }
+
+   private void handleSaslStep() throws SecurityException {
+      try {
+         if (sasl.pending() != 0) {
+            byte[] challenge = new byte[sasl.pending()];
+            sasl.recv(challenge, 0, challenge.length);
+            byte[] response = mechanism.getChallengeResponse(challenge);
+            sasl.send(response, 0, response.length);
+         }
+      }
+      catch (SaslException se) {
+         // TODO - Better error message.
+         SecurityException jmsse = new SecurityException("Exception while 
processing SASL step.");
+         jmsse.initCause(se);
+         throw jmsse;
+      }
+   }
+
+   private void handleSaslFail() throws SecurityException {
+      // TODO - Better error message.
+      throw new SecurityException("Client failed to authenticate");
+   }
+}

Reply via email to