ARTEMIS-738 Improving TX support on AMQP

https://issues.apache.org/jira/browse/ARTEMIS-738


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/113c0c93
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/113c0c93
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/113c0c93

Branch: refs/heads/master
Commit: 113c0c9360197ef3467f3907a604fa527247c858
Parents: 5ea53c4
Author: Clebert Suconic <clebertsuco...@apache.org>
Authored: Thu Sep 15 15:28:07 2016 -0400
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Wed Sep 21 18:14:38 2016 -0400

----------------------------------------------------------------------
 .../plug/ActiveMQProtonConnectionCallback.java  |  71 ++-
 .../plug/ProtonSessionIntegrationCallback.java  |  84 ++-
 .../org/proton/plug/AMQPConnectionCallback.java |  11 +
 .../org/proton/plug/AMQPSessionCallback.java    |  18 +-
 .../context/AbstractProtonSessionContext.java   |   1 -
 .../plug/context/ProtonTransactionHandler.java  |   8 +-
 .../server/ProtonServerReceiverContext.java     |  10 +-
 .../server/ProtonServerSenderContext.java       |   7 +-
 .../server/ProtonServerSessionContext.java      |   6 +
 .../ActiveMQAMQPProtocolMessageBundle.java      |   3 +
 .../context/AbstractConnectionContextTest.java  |  17 +
 .../proton/plug/test/invm/ProtonINVMSPI.java    |  33 +
 .../plug/test/minimalclient/AMQPClientSPI.java  |  18 +
 .../minimalserver/MinimalConnectionSPI.java     |  18 +
 .../test/minimalserver/MinimalSessionSPI.java   |  22 +-
 .../artemis/core/server/ServerSession.java      |   2 +
 .../core/server/impl/ServerSessionImpl.java     |  18 +-
 .../transport/amqp/client/AmqpConnection.java   |   9 +-
 .../transport/amqp/client/AmqpMessage.java      | 129 +++-
 .../transport/amqp/client/AmqpReceiver.java     |  28 +
 .../transport/amqp/client/AmqpSender.java       |  27 +-
 .../transport/amqp/client/AmqpSession.java      |  12 +-
 .../integration/amqp/AmqpClientTestSupport.java | 194 ++++++
 .../integration/amqp/AmqpTransactionTest.java   | 625 +++++++++++++++++++
 .../tests/integration/proton/ProtonTest.java    |   3 +-
 .../integration/proton/ProtonTestBase.java      |   1 -
 .../integration/proton/ProtonTestForHeader.java |   3 -
 27 files changed, 1263 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
index ea66b01..d5b2ff7 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
@@ -21,6 +21,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -37,8 +39,13 @@ import 
org.apache.activemq.artemis.core.protocol.proton.sasl.ActiveMQPlainSASL;
 import org.apache.activemq.artemis.core.remoting.CloseListener;
 import org.apache.activemq.artemis.core.remoting.FailureListener;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.jboss.logging.Logger;
@@ -47,7 +54,9 @@ import org.proton.plug.AMQPConnectionContext;
 import org.proton.plug.AMQPSessionCallback;
 import org.proton.plug.SASLResult;
 import org.proton.plug.ServerSASL;
+import org.proton.plug.exceptions.ActiveMQAMQPException;
 import org.proton.plug.handler.ExtCapability;
+import org.proton.plug.logger.ActiveMQAMQPProtocolMessageBundle;
 import org.proton.plug.sasl.AnonymousServerSASL;
 
 import static org.proton.plug.AmqpSupport.CONTAINER_ID;
@@ -55,8 +64,11 @@ import static org.proton.plug.AmqpSupport.INVALID_FIELD;
 import static 
org.proton.plug.context.AbstractConnectionContext.CONNECTION_OPEN_FAILED;
 
 public class ActiveMQProtonConnectionCallback implements 
AMQPConnectionCallback, FailureListener, CloseListener {
+   private static final Logger logger = 
Logger.getLogger(ActiveMQProtonConnectionCallback.class);
    private static final List<String> connectedContainers = 
Collections.synchronizedList(new ArrayList());
 
+   private ConcurrentMap<XidImpl, Transaction> transactions = new 
ConcurrentHashMap<>();
+
    private static final Logger log = 
Logger.getLogger(ActiveMQProtonConnectionCallback.class);
 
    private final ProtonProtocolManager manager;
@@ -117,11 +129,23 @@ public class ActiveMQProtonConnectionCallback implements 
AMQPConnectionCallback,
 
    @Override
    public void close() {
-      if (registeredConnectionId.getAndSet(false)) {
-         server.removeClientConnection(remoteContainerId);
+      try {
+         if (registeredConnectionId.getAndSet(false)) {
+            server.removeClientConnection(remoteContainerId);
+         }
+         connection.close();
+         amqpConnection.close();
+      }
+      finally {
+         for (Transaction tx : transactions.values()) {
+            try {
+               tx.rollback();
+            }
+            catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+            }
+         }
       }
-      connection.close();
-      amqpConnection.close();
    }
 
    public Executor getExeuctor() {
@@ -219,4 +243,43 @@ public class ActiveMQProtonConnectionCallback implements 
AMQPConnectionCallback,
    public void connectionFailed(ActiveMQException exception, boolean 
failedOver, String scaleDownTargetNodeID) {
       close();
    }
+
+   @Override
+   public Binary newTransaction() {
+      XidImpl xid = newXID();
+      Transaction transaction = new TransactionImpl(xid, 
server.getStorageManager(), -1);
+      transactions.put(xid, transaction);
+      return new Binary(xid.getGlobalTransactionId());
+   }
+
+   @Override
+   public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException 
{
+      XidImpl xid = newXID(txid.getArray());
+      Transaction tx = transactions.get(xid);
+
+      if (tx == null) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.txNotFound(xid.toString());
+      }
+
+      return tx;
+   }
+
+   @Override
+   public void removeTransaction(Binary txid) {
+      XidImpl xid = newXID(txid.getArray());
+      transactions.remove(xid);
+   }
+
+
+   protected XidImpl newXID() {
+      return 
newXID(UUIDGenerator.getInstance().generateStringUUID().getBytes());
+   }
+
+   protected XidImpl newXID(byte[] bytes) {
+      return new XidImpl("amqp".getBytes(), 1, bytes);
+   }
+
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index 153d033..da9dd9c 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -42,7 +42,6 @@ import 
org.apache.activemq.artemis.jms.client.ActiveMQConnection;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
-import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.activemq.artemis.utils.IDGenerator;
 import org.apache.activemq.artemis.utils.SelectorTranslator;
 import org.apache.activemq.artemis.utils.SimpleIDGenerator;
@@ -61,6 +60,7 @@ import org.proton.plug.AMQPSessionCallback;
 import org.proton.plug.AMQPSessionContext;
 import org.proton.plug.SASLResult;
 import org.proton.plug.context.ProtonPlugSender;
+import org.proton.plug.exceptions.ActiveMQAMQPException;
 import org.proton.plug.exceptions.ActiveMQAMQPInternalErrorException;
 import org.proton.plug.exceptions.ActiveMQAMQPResourceLimitExceededException;
 import org.proton.plug.sasl.PlainSASLResult;
@@ -282,46 +282,11 @@ public class ProtonSessionIntegrationCallback implements 
AMQPSessionCallback, Se
    }
 
    @Override
-   public Binary getCurrentTXID() {
-      Transaction tx = serverSession.getCurrentTransaction();
-      if (tx == null) {
-         tx = serverSession.newTransaction();
-         serverSession.resetTX(tx);
-      }
-      return new Binary(ByteUtil.longToBytes(tx.getID()));
-   }
-
-   @Override
    public String tempQueueName() {
       return UUIDGenerator.getInstance().generateStringUUID();
    }
 
    @Override
-   public void commitCurrentTX() throws Exception {
-      recoverContext();
-      try {
-         serverSession.commit();
-      }
-      finally {
-         resetContext();
-      }
-   }
-
-   @Override
-   public void rollbackCurrentTX(boolean lastMessageDelivered) throws 
Exception {
-      //need to check here as this can be called if init fails
-      if (serverSession != null) {
-         recoverContext();
-         try {
-            serverSession.rollback(lastMessageDelivered);
-         }
-         finally {
-            resetContext();
-         }
-      }
-   }
-
-   @Override
    public void close() throws Exception {
       //need to check here as this can be called if init fails
       if (serverSession != null) {
@@ -336,10 +301,13 @@ public class ProtonSessionIntegrationCallback implements 
AMQPSessionCallback, Se
    }
 
    @Override
-   public void ack(Object brokerConsumer, Object message) throws Exception {
+   public void ack(Transaction transaction, Object brokerConsumer, Object 
message) throws Exception {
+      if (transaction == null) {
+         transaction = serverSession.getCurrentTransaction();
+      }
       recoverContext();
       try {
-         ((ServerConsumer) 
brokerConsumer).individualAcknowledge(serverSession.getCurrentTransaction(), 
((ServerMessage) message).getMessageID());
+         ((ServerConsumer) brokerConsumer).individualAcknowledge(transaction, 
((ServerMessage) message).getMessageID());
       }
       finally {
          resetContext();
@@ -363,7 +331,8 @@ public class ProtonSessionIntegrationCallback implements 
AMQPSessionCallback, Se
    }
 
    @Override
-   public void serverSend(final Receiver receiver,
+   public void serverSend(final Transaction transaction,
+                          final Receiver receiver,
                           final Delivery delivery,
                           String address,
                           int messageFormat,
@@ -382,10 +351,10 @@ public class ProtonSessionIntegrationCallback implements 
AMQPSessionCallback, Se
       if (store.isRejectingMessages()) {
          // We drop pre-settled messages (and abort any associated Tx)
          if (delivery.remotelySettled()) {
-            if (serverSession.getCurrentTransaction() != null) {
+            if (transaction != null) {
                String amqpAddress = 
delivery.getLink().getTarget().getAddress();
                ActiveMQException e = new 
ActiveMQAMQPResourceLimitExceededException("Address is full: " + amqpAddress);
-               serverSession.getCurrentTransaction().markAsRollbackOnly(e);
+               transaction.markAsRollbackOnly(e);
             }
          }
          else {
@@ -393,7 +362,7 @@ public class ProtonSessionIntegrationCallback implements 
AMQPSessionCallback, Se
          }
       }
       else {
-         serverSend(message, delivery, receiver);
+         serverSend(transaction, message, delivery, receiver);
       }
    }
 
@@ -406,11 +375,11 @@ public class ProtonSessionIntegrationCallback implements 
AMQPSessionCallback, Se
       connection.flush();
    }
 
-   private void serverSend(final ServerMessage message, final Delivery 
delivery, final Receiver receiver) throws Exception {
+   private void serverSend(final Transaction transaction, final ServerMessage 
message, final Delivery delivery, final Receiver receiver) throws Exception {
       try {
 
          
message.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString(),
 receiver.getSession().getConnection().getRemoteContainer());
-         serverSession.send(message, false);
+         serverSession.send(transaction, message, false, false);
 
          // FIXME Potential race here...
          manager.getServer().getStorageManager().afterCompleteOperations(new 
IOCallback() {
@@ -543,4 +512,31 @@ public class ProtonSessionIntegrationCallback implements 
AMQPSessionCallback, Se
          return false;
       }
    }
+
+   @Override
+   public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException 
{
+      return protonSPI.getTransaction(txid);
+   }
+
+   @Override
+   public Binary newTransaction() {
+      return protonSPI.newTransaction();
+   }
+
+
+   @Override
+   public void commitTX(Binary txid) throws Exception {
+      Transaction tx = protonSPI.getTransaction(txid);
+      tx.commit(true);
+      protonSPI.removeTransaction(txid);
+   }
+
+   @Override
+   public void rollbackTX(Binary txid, boolean lastMessageReceived) throws 
Exception {
+      Transaction tx = protonSPI.getTransaction(txid);
+      tx.rollback();
+      protonSPI.removeTransaction(txid);
+
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java
 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java
index 15a3246..f4ed64c 100644
--- 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java
+++ 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java
@@ -17,7 +17,10 @@
 package org.proton.plug;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.engine.Connection;
+import org.proton.plug.exceptions.ActiveMQAMQPException;
 
 public interface AMQPConnectionCallback {
 
@@ -44,4 +47,12 @@ public interface AMQPConnectionCallback {
    void sendSASLSupported();
 
    boolean validateConnection(Connection connection, SASLResult saslResult);
+
+   Binary newTransaction();
+
+   Transaction getTransaction(Binary txid) throws ActiveMQAMQPException;
+
+   void removeTransaction(Binary txid);
+
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
index b6acd3f..5f3b6dd 100644
--- 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
+++ 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
@@ -18,11 +18,13 @@ package org.proton.plug;
 
 import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
 import org.apache.qpid.proton.message.ProtonJMessage;
 import org.proton.plug.context.ProtonPlugSender;
+import org.proton.plug.exceptions.ActiveMQAMQPException;
 
 /**
  * These are methods where the Proton Plug component will call your server
@@ -67,17 +69,20 @@ public interface AMQPSessionCallback {
    // This one can be a lot improved
    ProtonJMessage encodeMessage(Object message, int deliveryCount) throws 
Exception;
 
-   Binary getCurrentTXID();
-
    String tempQueueName();
 
-   void commitCurrentTX() throws Exception;
 
-   void rollbackCurrentTX(boolean lastMessageReceived) throws Exception;
+   Transaction getTransaction(Binary txid) throws ActiveMQAMQPException;
+
+   Binary newTransaction();
+
+   void commitTX(Binary txid) throws Exception;
+
+   void rollbackTX(Binary txid, boolean lastMessageReceived) throws Exception;
 
    void close() throws Exception;
 
-   void ack(Object brokerConsumer, Object message) throws Exception;
+   void ack(Transaction transaction, Object brokerConsumer, Object message) 
throws Exception;
 
    /**
     * @param brokerConsumer
@@ -96,7 +101,8 @@ public interface AMQPSessionCallback {
     * @param messageFormat
     * @param messageEncoded a Heap Buffer ByteBuffer (safe to convert into 
byte[])
     */
-   void serverSend(Receiver receiver,
+   void serverSend(Transaction transaction,
+                   Receiver receiver,
                    Delivery delivery,
                    String address,
                    int messageFormat,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java
 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java
index 96f7413..5c0a626 100644
--- 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java
+++ 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java
@@ -140,7 +140,6 @@ public abstract class AbstractProtonSessionContext extends 
ProtonInitializable i
       senders.clear();
       try {
          if (sessionSPI != null) {
-            sessionSPI.rollbackCurrentTX(false);
             sessionSPI.close();
          }
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
index 597b5e4..263d3e6 100644
--- 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
+++ 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
@@ -72,7 +72,7 @@ public class ProtonTransactionHandler implements 
ProtonDeliveryHandler {
          Object action = ((AmqpValue) msg.getBody()).getValue();
 
          if (action instanceof Declare) {
-            Binary txID = sessionSPI.getCurrentTXID();
+            Binary txID = sessionSPI.newTransaction();
             Declared declared = new Declared();
             declared.setTxnId(txID);
             delivery.disposition(declared);
@@ -80,9 +80,11 @@ public class ProtonTransactionHandler implements 
ProtonDeliveryHandler {
          }
          else if (action instanceof Discharge) {
             Discharge discharge = (Discharge) action;
+
+            Binary txID = discharge.getTxnId();
             if (discharge.getFail()) {
                try {
-                  sessionSPI.rollbackCurrentTX(true);
+                  sessionSPI.rollbackTX(txID, true);
                   delivery.disposition(new Accepted());
                }
                catch (Exception e) {
@@ -91,7 +93,7 @@ public class ProtonTransactionHandler implements 
ProtonDeliveryHandler {
             }
             else {
                try {
-                  sessionSPI.commitCurrentTX();
+                  sessionSPI.commitTX(txID);
                   delivery.disposition(new Accepted());
                }
                catch (ActiveMQAMQPException amqpE) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
index c564a9e..173ff28 100644
--- 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
+++ 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
@@ -18,8 +18,10 @@ package org.proton.plug.context.server;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.transaction.TransactionalState;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
@@ -130,7 +132,13 @@ public class ProtonServerReceiverContext extends 
AbstractProtonReceiverContext {
 
                receiver.advance();
 
-               sessionSPI.serverSend(receiver, delivery, address, 
delivery.getMessageFormat(), buffer);
+               Transaction tx = null;
+               if (delivery.getRemoteState() instanceof TransactionalState) {
+
+                  TransactionalState txState = (TransactionalState) 
delivery.getRemoteState();
+                  tx = this.sessionSPI.getTransaction(txState.getTxnId());
+               }
+               sessionSPI.serverSend(tx, receiver, delivery, address, 
delivery.getMessageFormat(), buffer);
 
                flow(maxCreditAllocation, minCreditRefresh);
             }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
index 2d91f37..e9bd123 100644
--- 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
+++ 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
@@ -21,6 +21,7 @@ import java.util.Objects;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
 import org.apache.activemq.artemis.selector.filter.FilterException;
 import org.apache.activemq.artemis.selector.impl.SelectorParser;
@@ -339,7 +340,9 @@ public class ProtonServerSenderContext extends 
AbstractProtonContextSender imple
       if (remoteState != null) {
          // If we are transactional then we need ack if the msg has been 
accepted
          if (remoteState instanceof TransactionalState) {
+
             TransactionalState txState = (TransactionalState) remoteState;
+            Transaction tx = 
this.sessionSPI.getTransaction(txState.getTxnId());
             if (txState.getOutcome() != null) {
                Outcome outcome = txState.getOutcome();
                if (outcome instanceof Accepted) {
@@ -353,7 +356,7 @@ public class ProtonServerSenderContext extends 
AbstractProtonContextSender imple
                   //we have to individual ack as we can't guarantee we will 
get the delivery updates (including acks) in order
                   // from dealer, a perf hit but a must
                   try {
-                     sessionSPI.ack(brokerConsumer, message);
+                     sessionSPI.ack(tx, brokerConsumer, message);
                   }
                   catch (Exception e) {
                      throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(),
 e.getMessage());
@@ -365,7 +368,7 @@ public class ProtonServerSenderContext extends 
AbstractProtonContextSender imple
             //we have to individual ack as we can't guarantee we will get the 
delivery updates (including acks) in order
             // from dealer, a perf hit but a must
             try {
-               sessionSPI.ack(brokerConsumer, message);
+               sessionSPI.ack(null, brokerConsumer, message);
             }
             catch (Exception e) {
                throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(),
 e.getMessage());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSessionContext.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSessionContext.java
 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSessionContext.java
index 46178a9..983fa4e 100644
--- 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSessionContext.java
+++ 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSessionContext.java
@@ -19,6 +19,7 @@ package org.proton.plug.context.server;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.transaction.Coordinator;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.engine.Receiver;
@@ -60,6 +61,11 @@ public class ProtonServerSessionContext extends 
AbstractProtonSessionContext {
 
    public void addTransactionHandler(Coordinator coordinator, Receiver 
receiver) {
       ProtonTransactionHandler transactionHandler = new 
ProtonTransactionHandler(sessionSPI);
+
+      coordinator.setCapabilities(Symbol.getSymbol("amqp:local-transactions"),
+                                  Symbol.getSymbol("amqp:multi-txns-per-ssn"),
+                                  Symbol.getSymbol("amqp:multi-ssns-per-txn"));
+
       receiver.setContext(transactionHandler);
       receiver.open();
       receiver.flow(100);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/logger/ActiveMQAMQPProtocolMessageBundle.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/logger/ActiveMQAMQPProtocolMessageBundle.java
 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/logger/ActiveMQAMQPProtocolMessageBundle.java
index 8817310..576e61a 100644
--- 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/logger/ActiveMQAMQPProtocolMessageBundle.java
+++ 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/logger/ActiveMQAMQPProtocolMessageBundle.java
@@ -74,4 +74,7 @@ public interface ActiveMQAMQPProtocolMessageBundle {
    @Message(id = 219013, value = "error committing coordinator: {0}", format = 
Message.Format.MESSAGE_FORMAT)
    ActiveMQAMQPIllegalStateException errorCommittingCoordinator(String 
message);
 
+   @Message(id = 219014, value = "Transaction not found: xid={0}", format = 
Message.Format.MESSAGE_FORMAT)
+   ActiveMQAMQPIllegalStateException txNotFound(String xidToString);
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
 
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
index da7b617..825b987 100644
--- 
a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
+++ 
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
@@ -20,7 +20,9 @@ import java.util.concurrent.Executors;
 
 import io.netty.buffer.ByteBuf;
 
+import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
+import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Link;
 import org.apache.qpid.proton.engine.Session;
@@ -78,6 +80,21 @@ public class AbstractConnectionContextTest {
       }
 
       @Override
+      public Binary newTransaction() {
+         return null;
+      }
+
+      @Override
+      public Transaction getTransaction(Binary txid) throws 
ActiveMQAMQPException {
+         return null;
+      }
+
+      @Override
+      public void removeTransaction(Binary txid) {
+
+      }
+
+      @Override
       public void onTransport(ByteBuf bytes, AMQPConnectionContext connection) 
{
 
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
 
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
index 5de6e9d..a35e8ac 100644
--- 
a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
+++ 
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
@@ -20,7 +20,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
+import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.engine.Connection;
 import org.jboss.logging.Logger;
 import org.proton.plug.AMQPConnectionContext;
@@ -29,6 +31,7 @@ import org.proton.plug.AMQPSessionCallback;
 import org.proton.plug.SASLResult;
 import org.proton.plug.ServerSASL;
 import org.proton.plug.context.server.ProtonServerConnectionContext;
+import org.proton.plug.exceptions.ActiveMQAMQPException;
 import org.proton.plug.sasl.AnonymousServerSASL;
 import org.proton.plug.sasl.ServerSASLPlain;
 import org.proton.plug.test.minimalserver.MinimalSessionSPI;
@@ -132,6 +135,21 @@ public class ProtonINVMSPI implements 
AMQPConnectionCallback {
       return null;
    }
 
+   @Override
+   public Binary newTransaction() {
+      return null;
+   }
+
+   @Override
+   public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException 
{
+      return null;
+   }
+
+   @Override
+   public void removeTransaction(Binary txid) {
+
+   }
+
    class ReturnSPI implements AMQPConnectionCallback {
 
       @Override
@@ -140,6 +158,21 @@ public class ProtonINVMSPI implements 
AMQPConnectionCallback {
       }
 
       @Override
+      public Binary newTransaction() {
+         return null;
+      }
+
+      @Override
+      public Transaction getTransaction(Binary txid) throws 
ActiveMQAMQPException {
+         return null;
+      }
+
+      @Override
+      public void removeTransaction(Binary txid) {
+
+      }
+
+      @Override
       public ServerSASL[] getSASLMechnisms() {
          return new ServerSASL[]{new AnonymousServerSASL(), new 
ServerSASLPlain()};
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
 
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
index be1571c..85e4c02 100644
--- 
a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
+++ 
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
@@ -22,6 +22,8 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.engine.Connection;
 import org.jboss.logging.Logger;
 import org.proton.plug.AMQPConnectionContext;
@@ -29,6 +31,7 @@ import org.proton.plug.AMQPConnectionCallback;
 import org.proton.plug.AMQPSessionCallback;
 import org.proton.plug.SASLResult;
 import org.proton.plug.ServerSASL;
+import org.proton.plug.exceptions.ActiveMQAMQPException;
 import org.proton.plug.sasl.AnonymousServerSASL;
 import org.proton.plug.sasl.ServerSASLPlain;
 import org.proton.plug.util.ByteUtil;
@@ -75,6 +78,21 @@ public class AMQPClientSPI implements AMQPConnectionCallback 
{
    }
 
    @Override
+   public Binary newTransaction() {
+      return null;
+   }
+
+   @Override
+   public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException 
{
+      return null;
+   }
+
+   @Override
+   public void removeTransaction(Binary txid) {
+
+   }
+
+   @Override
    public boolean validateConnection(Connection connection, SASLResult 
saslResult) {
       return true;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
 
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
index 1b9c919..6325ad7 100644
--- 
a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
+++ 
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
@@ -24,7 +24,9 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
+import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
+import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.engine.Connection;
 import org.jboss.logging.Logger;
 import org.proton.plug.AMQPConnectionContext;
@@ -32,6 +34,7 @@ import org.proton.plug.AMQPConnectionCallback;
 import org.proton.plug.AMQPSessionCallback;
 import org.proton.plug.SASLResult;
 import org.proton.plug.ServerSASL;
+import org.proton.plug.exceptions.ActiveMQAMQPException;
 import org.proton.plug.sasl.AnonymousServerSASL;
 import org.proton.plug.sasl.ServerSASLPlain;
 import org.proton.plug.util.ByteUtil;
@@ -88,6 +91,21 @@ public class MinimalConnectionSPI implements 
AMQPConnectionCallback {
    }
 
    @Override
+   public Binary newTransaction() {
+      return null;
+   }
+
+   @Override
+   public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException 
{
+      return null;
+   }
+
+   @Override
+   public void removeTransaction(Binary txid) {
+
+   }
+
+   @Override
    public void onTransport(final ByteBuf bytes, final AMQPConnectionContext 
connection) {
       final int bufferSize = bytes.writerIndex();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
 
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
index f9a3533..d366c5b 100644
--- 
a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
+++ 
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
@@ -22,7 +22,10 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import 
org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
@@ -123,16 +126,23 @@ public class MinimalSessionSPI implements 
AMQPSessionCallback {
    }
 
    @Override
-   public Binary getCurrentTXID() {
-      return new Binary(new byte[]{1});
+   public Transaction getTransaction(Binary txid) {
+      return new TransactionImpl(new NullStorageManager());
    }
 
    @Override
-   public void commitCurrentTX() {
+   public Binary newTransaction() {
+      return null;
+   }
+
+   @Override
+   public void commitTX(Binary txid) throws Exception {
+
    }
 
    @Override
-   public void rollbackCurrentTX(boolean lastMessage) {
+   public void rollbackTX(Binary txid, boolean lastMessageReceived) throws 
Exception {
+
    }
 
    @Override
@@ -141,7 +151,7 @@ public class MinimalSessionSPI implements 
AMQPSessionCallback {
    }
 
    @Override
-   public void ack(Object brokerConsumer, Object message) {
+   public void ack(Transaction tx, Object brokerConsumer, Object message) {
 
    }
 
@@ -157,7 +167,7 @@ public class MinimalSessionSPI implements 
AMQPSessionCallback {
    }
 
    @Override
-   public void serverSend(Receiver receiver, Delivery delivery, String 
address, int messageFormat, ByteBuf buffer) {
+   public void serverSend(Transaction tx, Receiver receiver, Delivery 
delivery, String address, int messageFormat, ByteBuf buffer) {
       ProtonServerMessage serverMessage = new ProtonServerMessage();
       serverMessage.decode(buffer.nioBuffer());
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index 953de1f..3521d71 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -133,6 +133,8 @@ public interface ServerSession extends SecurityAuth {
 
    void sendContinuations(int packetSize, long totalBodySize, byte[] body, 
boolean continues) throws Exception;
 
+   RoutingStatus send(Transaction tx, ServerMessage message, boolean direct, 
boolean noAutoCreateQueue) throws Exception;
+
    RoutingStatus send(ServerMessage message, boolean direct, boolean 
noAutoCreateQueue) throws Exception;
 
    RoutingStatus send(ServerMessage message, boolean direct) throws Exception;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index d20fa43..3ccfd16 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1256,6 +1256,10 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
 
    @Override
    public RoutingStatus send(final ServerMessage message, final boolean 
direct, boolean noAutoCreateQueue) throws Exception {
+      return send(getCurrentTransaction(), message, direct, noAutoCreateQueue);
+   }
+
+   public RoutingStatus send(Transaction tx, final ServerMessage message, 
final boolean direct, boolean noAutoCreateQueue) throws Exception {
 
       // If the protocol doesn't support flow control, we have no choice other 
than fail the communication
       if (!this.getRemotingConnection().isSupportsFlowControl() && 
pagingManager.isDiskFull()) {
@@ -1308,10 +1312,10 @@ public class ServerSessionImpl implements 
ServerSession, FailureListener {
       if (message.getAddress().equals(managementAddress)) {
          // It's a management message
 
-         handleManagementMessage(message, direct);
+         handleManagementMessage(tx, message, direct);
       }
       else {
-         result = doSend(message, direct, noAutoCreateQueue);
+         result = doSend(tx, message, direct, noAutoCreateQueue);
       }
       return result;
    }
@@ -1337,7 +1341,7 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
             currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 
messageBodySize);
          }
 
-         doSend(currentLargeMessage, false, false);
+         doSend(tx, currentLargeMessage, false, false);
 
          currentLargeMessage = null;
       }
@@ -1526,7 +1530,7 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
       started = s;
    }
 
-   private void handleManagementMessage(final ServerMessage message, final 
boolean direct) throws Exception {
+   private RoutingStatus handleManagementMessage(final Transaction tx, final 
ServerMessage message, final boolean direct) throws Exception {
       try {
          securityCheck(message.getAddress(), CheckType.MANAGE, this);
       }
@@ -1544,8 +1548,10 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
       if (replyTo != null) {
          reply.setAddress(replyTo);
 
-         doSend(reply, direct, false);
+         doSend(tx, reply, direct, false);
       }
+
+      return RoutingStatus.OK;
    }
 
    private void doRollback(final boolean clientFailed,
@@ -1600,7 +1606,7 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
       theTx.rollback();
    }
 
-   protected RoutingStatus doSend(final ServerMessage msg, final boolean 
direct, final boolean noAutoCreateQueue) throws Exception {
+   public RoutingStatus doSend(final Transaction tx, final ServerMessage msg, 
final boolean direct, final boolean noAutoCreateQueue) throws Exception {
       RoutingStatus result = RoutingStatus.OK;
       // check the user has write access to this address.
       try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
index 1454dd9..3e2c5d7 100644
--- 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
@@ -228,7 +228,14 @@ public class AmqpConnection extends 
AmqpAbstractResource<Connection> implements
                }
             }
 
-            serializer.shutdown();
+            serializer.shutdownNow();
+            try {
+               if (!serializer.awaitTermination(10, TimeUnit.SECONDS)) {
+                  LOG.warn("Serializer didn't shutdown cleanly");
+               }
+            }
+            catch (InterruptedException e) {
+            }
          }
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index 320d174..060fc4e 100644
--- 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -60,7 +60,8 @@ public class AmqpMessage {
     * Creates a new AmqpMessage that wraps the information necessary to handle
     * an outgoing message.
     *
-    * @param message the Proton message that is to be sent.
+    * @param message
+    *        the Proton message that is to be sent.
     */
    public AmqpMessage(Message message) {
       this(null, message, null);
@@ -70,9 +71,12 @@ public class AmqpMessage {
     * Creates a new AmqpMessage that wraps the information necessary to handle
     * an incoming delivery.
     *
-    * @param receiver the AmqpReceiver that received this message.
-    * @param message  the Proton message that was received.
-    * @param delivery the Delivery instance that produced this message.
+    * @param receiver
+    *        the AmqpReceiver that received this message.
+    * @param message
+    *        the Proton message that was received.
+    * @param delivery
+    *        the Delivery instance that produced this message.
     */
    @SuppressWarnings("unchecked")
    public AmqpMessage(AmqpReceiver receiver, Message message, Delivery 
delivery) {
@@ -136,10 +140,29 @@ public class AmqpMessage {
    }
 
    /**
+    * Accepts the message marking it as consumed on the remote peer.
+    *
+    * @param session
+    *      The session that is used to manage acceptance of the message.
+    *
+    * @throws Exception if an error occurs during the accept.
+    */
+   public void accept(AmqpSession txnSession) throws Exception {
+      if (receiver == null) {
+         throw new IllegalStateException("Can't accept non-received message.");
+      }
+
+      receiver.accept(delivery, txnSession);
+   }
+
+   /**
     * Marks the message as Modified, indicating whether it failed to deliver 
and is not deliverable here.
     *
-    * @param deliveryFailed    indicates that the delivery failed for some 
reason.
-    * @param undeliverableHere marks the delivery as not being able to be 
process by link it was sent to.
+    * @param deliveryFailed
+    *        indicates that the delivery failed for some reason.
+    * @param undeliverableHere
+    *        marks the delivery as not being able to be process by link it was 
sent to.
+    *
     * @throws Exception if an error occurs during the process.
     */
    public void modified(Boolean deliveryFailed, Boolean undeliverableHere) 
throws Exception {
@@ -166,9 +189,35 @@ public class AmqpMessage {
    //----- Convenience methods for constructing outbound messages -----------//
 
    /**
+    * Sets the address which is applied to the AMQP message To field in the 
message properties
+    *
+    * @param address
+    *      The address that should be applied in the Message To field.
+    */
+   public void setAddress(String address) {
+      checkReadOnly();
+      lazyCreateProperties();
+      getWrappedMessage().setAddress(address);
+   }
+
+   /**
+    * Return the set address that was set in the Message To field.
+    *
+    * @return the set address String form or null if not set.
+    */
+   public String getAddress() {
+      if (message.getProperties() == null) {
+         return null;
+      }
+
+      return message.getProperties().getTo();
+   }
+
+   /**
     * Sets the MessageId property on an outbound message using the provided 
String
     *
-    * @param messageId the String message ID value to set.
+    * @param messageId
+    *        the String message ID value to set.
     */
    public void setMessageId(String messageId) {
       checkReadOnly();
@@ -207,7 +256,8 @@ public class AmqpMessage {
    /**
     * Sets the MessageId property on an outbound message using the provided 
value
     *
-    * @param messageId the message ID value to set.
+    * @param messageId
+    *        the message ID value to set.
     */
    public void setRawMessageId(Object messageId) {
       checkReadOnly();
@@ -218,7 +268,8 @@ public class AmqpMessage {
    /**
     * Sets the CorrelationId property on an outbound message using the 
provided String
     *
-    * @param correlationId the String Correlation ID value to set.
+    * @param correlationId
+    *        the String Correlation ID value to set.
     */
    public void setCorrelationId(String correlationId) {
       checkReadOnly();
@@ -257,7 +308,8 @@ public class AmqpMessage {
    /**
     * Sets the CorrelationId property on an outbound message using the 
provided value
     *
-    * @param correlationId the correlation ID value to set.
+    * @param correlationId
+    *        the correlation ID value to set.
     */
    public void setRawCorrelationId(Object correlationId) {
       checkReadOnly();
@@ -268,7 +320,8 @@ public class AmqpMessage {
    /**
     * Sets the GroupId property on an outbound message using the provided 
String
     *
-    * @param groupId the String Group ID value to set.
+    * @param messageId
+    *        the String Group ID value to set.
     */
    public void setGroupId(String groupId) {
       checkReadOnly();
@@ -293,7 +346,8 @@ public class AmqpMessage {
    /**
     * Sets the durable header on the outgoing message.
     *
-    * @param durable the boolean durable value to set.
+    * @param durable
+    *        the boolean durable value to set.
     */
    public void setDurable(boolean durable) {
       checkReadOnly();
@@ -318,8 +372,10 @@ public class AmqpMessage {
    /**
     * Sets a given application property on an outbound message.
     *
-    * @param key   the name to assign the new property.
-    * @param value the value to set for the named property.
+    * @param key
+    *        the name to assign the new property.
+    * @param value
+    *        the value to set for the named property.
     */
    public void setApplicationProperty(String key, Object value) {
       checkReadOnly();
@@ -331,8 +387,10 @@ public class AmqpMessage {
     * Gets the application property that is mapped to the given name or null
     * if no property has been set with that name.
     *
-    * @param key the name used to lookup the property in the application 
properties.
-    * @return the propety value or null if not set.
+    * @param key
+    *        the name used to lookup the property in the application 
properties.
+    *
+    * @return the property value or null if not set.
     */
    public Object getApplicationProperty(String key) {
       if (applicationPropertiesMap == null) {
@@ -346,8 +404,10 @@ public class AmqpMessage {
     * Perform a proper annotation set on the AMQP Message based on a Symbol 
key and
     * the target value to append to the current annotations.
     *
-    * @param key   The name of the Symbol whose value is being set.
-    * @param value The new value to set in the annotations of this message.
+    * @param key
+    *        The name of the Symbol whose value is being set.
+    * @param value
+    *        The new value to set in the annotations of this message.
     */
    public void setMessageAnnotation(String key, Object value) {
       checkReadOnly();
@@ -360,7 +420,9 @@ public class AmqpMessage {
     * that annotation name.  If the message annotations have not been created 
yet
     * then this method will always return null.
     *
-    * @param key the Symbol name that should be looked up in the message 
annotations.
+    * @param key
+    *        the Symbol name that should be looked up in the message 
annotations.
+    *
     * @return the value of the annotation if it exists, or null if not set or 
not accessible.
     */
    public Object getMessageAnnotation(String key) {
@@ -375,8 +437,10 @@ public class AmqpMessage {
     * Perform a proper delivery annotation set on the AMQP Message based on a 
Symbol
     * key and the target value to append to the current delivery annotations.
     *
-    * @param key   The name of the Symbol whose value is being set.
-    * @param value The new value to set in the delivery annotations of this 
message.
+    * @param key
+    *        The name of the Symbol whose value is being set.
+    * @param value
+    *        The new value to set in the delivery annotations of this message.
     */
    public void setDeliveryAnnotation(String key, Object value) {
       checkReadOnly();
@@ -389,7 +453,9 @@ public class AmqpMessage {
     * that annotation name.  If the message annotations have not been created 
yet
     * then this method will always return null.
     *
-    * @param key the Symbol name that should be looked up in the message 
annotations.
+    * @param key
+    *        the Symbol name that should be looked up in the message 
annotations.
+    *
     * @return the value of the annotation if it exists, or null if not set or 
not accessible.
     */
    public Object getDeliveryAnnotation(String key) {
@@ -406,7 +472,9 @@ public class AmqpMessage {
     * Sets a String value into the body of an outgoing Message, throws
     * an exception if this is an incoming message instance.
     *
-    * @param value the String value to store in the Message body.
+    * @param value
+    *        the String value to store in the Message body.
+    *
     * @throws IllegalStateException if the message is read only.
     */
    public void setText(String value) throws IllegalStateException {
@@ -419,7 +487,9 @@ public class AmqpMessage {
     * Sets a byte array value into the body of an outgoing Message, throws
     * an exception if this is an incoming message instance.
     *
-    * @param bytes the byte array value to store in the Message body.
+    * @param value
+    *        the byte array value to store in the Message body.
+    *
     * @throws IllegalStateException if the message is read only.
     */
    public void setBytes(byte[] bytes) throws IllegalStateException {
@@ -432,7 +502,9 @@ public class AmqpMessage {
     * Sets a byte array value into the body of an outgoing Message, throws
     * an exception if this is an incoming message instance.
     *
-    * @param described the byte array value to store in the Message body.
+    * @param value
+    *        the byte array value to store in the Message body.
+    *
     * @throws IllegalStateException if the message is read only.
     */
    public void setDescribedType(DescribedType described) throws 
IllegalStateException {
@@ -445,6 +517,7 @@ public class AmqpMessage {
     * Attempts to retrieve the message body as an DescribedType instance.
     *
     * @return an DescribedType instance if one is stored in the message body.
+    *
     * @throws NoSuchElementException if the body does not contain a 
DescribedType.
     */
    public DescribedType getDescribedType() throws NoSuchElementException {
@@ -482,21 +555,21 @@ public class AmqpMessage {
 
    private void lazyCreateMessageAnnotations() {
       if (messageAnnotationsMap == null) {
-         messageAnnotationsMap = new HashMap<>();
+         messageAnnotationsMap = new HashMap<Symbol, Object>();
          message.setMessageAnnotations(new 
MessageAnnotations(messageAnnotationsMap));
       }
    }
 
    private void lazyCreateDeliveryAnnotations() {
       if (deliveryAnnotationsMap == null) {
-         deliveryAnnotationsMap = new HashMap<>();
+         deliveryAnnotationsMap = new HashMap<Symbol, Object>();
          message.setDeliveryAnnotations(new 
DeliveryAnnotations(deliveryAnnotationsMap));
       }
    }
 
    private void lazyCreateApplicationProperties() {
       if (applicationPropertiesMap == null) {
-         applicationPropertiesMap = new HashMap<>();
+         applicationPropertiesMap = new HashMap<String, Object>();
          message.setApplicationProperties(new 
ApplicationProperties(applicationPropertiesMap));
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
index 9f3bff2..2802751 100644
--- 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -208,6 +208,7 @@ public class AmqpReceiver extends 
AmqpAbstractResource<Receiver> {
     * it is returned immediately otherwise this methods return null without 
waiting.
     *
     * @return a newly received message or null if there is no currently 
available message.
+    *
     * @throws Exception if an error occurs during the receive attempt.
     */
    public AmqpMessage receiveNoWait() throws Exception {
@@ -219,6 +220,7 @@ public class AmqpReceiver extends 
AmqpAbstractResource<Receiver> {
     * Request a remote peer send a Message to this client waiting until one 
arrives.
     *
     * @return the pulled AmqpMessage or null if none was pulled from the 
remote.
+    *
     * @throws IOException if an error occurs
     */
    public AmqpMessage pull() throws IOException {
@@ -402,12 +404,38 @@ public class AmqpReceiver extends 
AmqpAbstractResource<Receiver> {
     * @throws IOException if an error occurs while sending the accept.
     */
    public void accept(final Delivery delivery) throws IOException {
+      accept(delivery, this.session);
+   }
+
+   /**
+    * Accepts a message that was dispatched under the given Delivery instance.
+    *
+    * This method allows for the session that is used in the accept to be 
specified by the
+    * caller.  This allows for an accepted message to be involved in a 
transaction that is
+    * being managed by some other session other than the one that created this 
receiver.
+    *
+    * @param delivery
+    *        the Delivery instance to accept.
+    * @param session
+    *        the session under which the message is being accepted.
+    *
+    * @throws IOException if an error occurs while sending the accept.
+    */
+   public void accept(final Delivery delivery, final AmqpSession session) 
throws IOException {
       checkClosed();
 
       if (delivery == null) {
          throw new IllegalArgumentException("Delivery to accept cannot be 
null");
       }
 
+      if (session == null) {
+         throw new IllegalArgumentException("Session given cannot be null");
+      }
+
+      if (session.getConnection() != this.session.getConnection()) {
+         throw new IllegalArgumentException("The session used for accept must 
originate from the connection that created this receiver.");
+      }
+
       final ClientFuture request = new ClientFuture();
       session.getScheduler().execute(new Runnable() {
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
index 5ae2948..ed83e02 100644
--- 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
@@ -118,6 +118,18 @@ public class AmqpSender extends 
AmqpAbstractResource<Sender> {
     */
    public void send(final AmqpMessage message) throws IOException {
       checkClosed();
+      send(message, null);
+   }
+
+   /**
+    * Sends the given message to this senders assigned address using the 
supplied transaction ID.
+    *
+    * @param message the message to send.
+    * @param txId    the transaction ID to assign the outgoing send.
+    * @throws IOException if an error occurs during the send.
+    */
+   public void send(final AmqpMessage message, final AmqpTransactionId txId) 
throws IOException {
+      checkClosed();
       final ClientFuture sendRequest = new ClientFuture();
 
       session.getScheduler().execute(new Runnable() {
@@ -125,7 +137,7 @@ public class AmqpSender extends 
AmqpAbstractResource<Sender> {
          @Override
          public void run() {
             try {
-               doSend(message, sendRequest);
+               doSend(message, sendRequest, txId);
                session.pumpToProtonTransport(sendRequest);
             }
             catch (Exception e) {
@@ -316,7 +328,7 @@ public class AmqpSender extends 
AmqpAbstractResource<Sender> {
       }
    }
 
-   private void doSend(AmqpMessage message, AsyncResult request) throws 
Exception {
+   private void doSend(AmqpMessage message, AsyncResult request, 
AmqpTransactionId txId) throws Exception {
       LOG.trace("Producer sending message: {}", message);
 
       Delivery delivery = null;
@@ -330,8 +342,15 @@ public class AmqpSender extends 
AmqpAbstractResource<Sender> {
 
       delivery.setContext(request);
 
-      if (session.isInTransaction()) {
-         Binary amqpTxId = session.getTransactionId().getRemoteTxId();
+      Binary amqpTxId = null;
+      if (txId != null) {
+         amqpTxId = txId.getRemoteTxId();
+      }
+      else if (session.isInTransaction()) {
+         amqpTxId = session.getTransactionId().getRemoteTxId();
+      }
+
+      if (amqpTxId != null) {
          TransactionalState state = new TransactionalState();
          state.setTxnId(amqpTxId);
          delivery.disposition(state);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index 82b6aec..755ecf8 100644
--- 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -408,11 +408,15 @@ public class AmqpSession extends 
AmqpAbstractResource<Session> {
       connection.pumpToProtonTransport(request);
    }
 
-   AmqpTransactionId getTransactionId() {
-      return txContext.getTransactionId();
+   public AmqpTransactionId getTransactionId() {
+      if (txContext != null && txContext.isInTransaction()) {
+         return txContext.getTransactionId();
+      }
+
+      return null;
    }
 
-   public AmqpTransactionContext getTransactionContext() {
+   AmqpTransactionContext getTransactionContext() {
       return txContext;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
new file mode 100644
index 0000000..8fa140a
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.net.URI;
+import java.util.LinkedList;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.junit.After;
+import org.junit.Before;
+
+/**
+ * Test support class for tests that will be using the AMQP Proton wrapper 
client.
+ * This is to make it easier to migrate tests from ActiveMQ5
+ */
+public class AmqpClientTestSupport extends ActiveMQTestBase {
+
+
+   ActiveMQServer server;
+
+   LinkedList<AmqpConnection> connections = new LinkedList<>();
+
+
+   protected AmqpConnection addConnection(AmqpConnection connection) {
+      connections.add(connection);
+      return connection;
+   }
+
+
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      server = createServer(true, true);
+      server.start();
+   }
+
+   @After
+   public void tearDown() throws Exception {
+      super.tearDown();
+
+      for (AmqpConnection conn: connections) {
+         try {
+            conn.close();
+         }
+         catch (Throwable ignored) {
+            ignored.printStackTrace();
+         }
+      }
+      server.stop();
+   }
+
+   public Queue getProxyToQueue(String queueName) {
+      return server.locateQueue(SimpleString.toSimpleString(queueName));
+   }
+
+   private String connectorScheme = "amqp";
+   private boolean useSSL;
+
+   public String getTestName() {
+      return "jms.queue." + getName();
+   }
+
+   public AmqpClientTestSupport() {
+   }
+
+   public AmqpClientTestSupport(String connectorScheme, boolean useSSL) {
+      this.connectorScheme = connectorScheme;
+      this.useSSL = useSSL;
+   }
+
+   public String getConnectorScheme() {
+      return connectorScheme;
+   }
+
+   public boolean isUseSSL() {
+      return useSSL;
+   }
+
+   public String getAmqpConnectionURIOptions() {
+      return "";
+   }
+
+   protected boolean isUseTcpConnector() {
+      return !isUseSSL() && !connectorScheme.contains("nio") && 
!connectorScheme.contains("ws");
+   }
+
+   protected boolean isUseSslConnector() {
+      return isUseSSL() && !connectorScheme.contains("nio") && 
!connectorScheme.contains("wss");
+   }
+
+   protected boolean isUseNioConnector() {
+      return !isUseSSL() && connectorScheme.contains("nio");
+   }
+
+   protected boolean isUseNioPlusSslConnector() {
+      return isUseSSL() && connectorScheme.contains("nio");
+   }
+
+   protected boolean isUseWsConnector() {
+      return !isUseSSL() && connectorScheme.contains("ws");
+   }
+
+   protected boolean isUseWssConnector() {
+      return isUseSSL() && connectorScheme.contains("wss");
+   }
+
+   public URI getBrokerAmqpConnectionURI() {
+      boolean webSocket = false;
+
+      try {
+         int port = 61616;
+
+         String uri = null;
+
+         if (isUseSSL()) {
+            if (webSocket) {
+               uri = "wss://127.0.0.1:" + port;
+            }
+            else {
+               uri = "ssl://127.0.0.1:" + port;
+            }
+         }
+         else {
+            if (webSocket) {
+               uri = "ws://127.0.0.1:" + port;
+            }
+            else {
+               uri = "tcp://127.0.0.1:" + port;
+            }
+         }
+
+         if (!getAmqpConnectionURIOptions().isEmpty()) {
+            uri = uri + "?" + getAmqpConnectionURIOptions();
+         }
+
+         return new URI(uri);
+      }
+      catch (Exception e) {
+         throw new RuntimeException();
+      }
+   }
+
+   public AmqpConnection createAmqpConnection() throws Exception {
+      return createAmqpConnection(getBrokerAmqpConnectionURI());
+   }
+
+   public AmqpConnection createAmqpConnection(String username, String 
password) throws Exception {
+      return createAmqpConnection(getBrokerAmqpConnectionURI(), username, 
password);
+   }
+
+   public AmqpConnection createAmqpConnection(URI brokerURI) throws Exception {
+      return createAmqpConnection(brokerURI, null, null);
+   }
+
+   public AmqpConnection createAmqpConnection(URI brokerURI, String username, 
String password) throws Exception {
+      return createAmqpClient(brokerURI, username, password).connect();
+   }
+
+   public AmqpClient createAmqpClient() throws Exception {
+      return createAmqpClient(getBrokerAmqpConnectionURI(), null, null);
+   }
+
+   public AmqpClient createAmqpClient(URI brokerURI) throws Exception {
+      return createAmqpClient(brokerURI, null, null);
+   }
+
+   public AmqpClient createAmqpClient(String username, String password) throws 
Exception {
+      return createAmqpClient(getBrokerAmqpConnectionURI(), username, 
password);
+   }
+
+   public AmqpClient createAmqpClient(URI brokerURI, String username, String 
password) throws Exception {
+      return new AmqpClient(brokerURI, username, password);
+   }
+}

Reply via email to