http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
new file mode 100644
index 0000000..ec37710
--- /dev/null
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -0,0 +1,599 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import static org.apache.activemq.transport.amqp.AmqpSupport.COPY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_NAME;
+import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_NAME;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.InvalidDestinationException;
+
+import org.apache.activemq.transport.amqp.client.util.ClientFuture;
+import org.apache.activemq.transport.amqp.client.util.UnmodifiableReceiver;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Released;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
+import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.message.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Receiver class that manages a Proton receiver endpoint.
+ */
+public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AmqpReceiver.class);
+
+    // TODO: Use constants available from Proton 0.9
+    private static final Symbol ACCEPTED_DESCRIPTOR_SYMBOL = 
Symbol.valueOf("amqp:accepted:list");
+    private static final Symbol REJECTED_DESCRIPTOR_SYMBOL = 
Symbol.valueOf("amqp:rejected:list");
+    private static final Symbol MODIFIED_DESCRIPTOR_SYMBOL = 
Symbol.valueOf("amqp:modified:list");
+    private static final Symbol RELEASED_DESCRIPTOR_SYMBOL = 
Symbol.valueOf("amqp:released:list");
+
+    private final AtomicBoolean closed = new AtomicBoolean();
+
+    private final BlockingQueue<AmqpMessage> prefetch = new 
LinkedBlockingDeque<AmqpMessage>();
+
+    private final AmqpSession session;
+    private final String address;
+    private final String receiverId;
+
+    private String subscriptionName;
+    private String selector;
+    private boolean presettle;
+    private boolean noLocal;
+
+    /**
+     * Create a new receiver instance.
+     *
+     * @param session
+     *                   The parent session that created the receiver.
+     * @param address
+     *        The address that this receiver should listen on.
+     * @param receiverId
+     *        The unique ID assigned to this receiver.
+     */
+    public AmqpReceiver(AmqpSession session, String address, String 
receiverId) {
+        this.session = session;
+        this.address = address;
+        this.receiverId = receiverId;
+    }
+
+    /**
+     * Close the sender, a closed sender will throw exceptions if any further 
send
+     * calls are made.
+     *
+     * @throws IOException if an error occurs while closing the sender.
+     */
+    public void close() throws IOException {
+        if (closed.compareAndSet(false, true)) {
+            final ClientFuture request = new ClientFuture();
+            session.getScheduler().execute(new Runnable() {
+
+                @Override
+                public void run() {
+                    checkClosed();
+                    close(request);
+                    session.pumpToProtonTransport();
+                }
+            });
+
+            request.sync();
+        }
+    }
+
+    /**
+     * @return this session's parent AmqpSession.
+     */
+    public AmqpSession getSession() {
+        return session;
+    }
+
+    /**
+     * @return the address that this receiver has been configured to listen on.
+     */
+    public String getAddress() {
+        return address;
+    }
+
+    /**
+     * Attempts to wait on a message to be delivered to this receiver.  The 
receive
+     * call will wait indefinitely for a message to be delivered.
+     *
+     * @return a newly received message sent to this receiver.
+     *
+     * @throws Exception if an error occurs during the receive attempt.
+     */
+    public AmqpMessage receive() throws Exception {
+        checkClosed();
+        return prefetch.take();
+    }
+
+    /**
+     * Attempts to receive a message sent to this receiver, waiting for the 
given
+     * timeout value before giving up and returning null.
+     *
+     * @param timeout
+     *               the time to wait for a new message to arrive.
+     * @param unit
+     *                   the unit of time that the timeout value represents.
+     *
+     * @return a newly received message or null if the time to wait period 
expires.
+     *
+     * @throws Exception if an error occurs during the receive attempt.
+     */
+    public AmqpMessage receive(long timeout, TimeUnit unit) throws Exception {
+        checkClosed();
+        return prefetch.poll(timeout, unit);
+    }
+
+    /**
+     * If a message is already available in this receiver's prefetch buffer 
then
+     * it is returned immediately otherwise this methods return null without 
waiting.
+     *
+     * @return a newly received message or null if there is no currently 
available message.
+     *
+     * @throws Exception if an error occurs during the receive attempt.
+     */
+    public AmqpMessage receiveNoWait() throws Exception {
+        checkClosed();
+        return prefetch.poll();
+    }
+
+    /**
+     * Controls the amount of credit given to the receiver link.
+     *
+     * @param credit
+     *        the amount of credit to grant.
+     *
+     * @throws IOException if an error occurs while sending the flow.
+     */
+    public void flow(final int credit) throws IOException {
+        checkClosed();
+        final ClientFuture request = new ClientFuture();
+        session.getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+                checkClosed();
+                try {
+                    getEndpoint().flow(credit);
+                    session.pumpToProtonTransport();
+                    request.onSuccess();
+                } catch (Exception e) {
+                    request.onFailure(e);
+                }
+            }
+        });
+
+        request.sync();
+    }
+
+    /**
+     * Attempts to drain a given amount of credit from the link.
+     *
+     * @param credit
+     *        the amount of credit to drain.
+     *
+     * @throws IOException if an error occurs while sending the drain.
+     */
+    public void drain(final int credit) throws IOException {
+        checkClosed();
+        final ClientFuture request = new ClientFuture();
+        session.getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+                checkClosed();
+                try {
+                    getEndpoint().drain(credit);
+                    session.pumpToProtonTransport();
+                    request.onSuccess();
+                } catch (Exception e) {
+                    request.onFailure(e);
+                }
+            }
+        });
+
+        request.sync();
+    }
+
+    /**
+     * Accepts a message that was dispatched under the given Delivery instance.
+     *
+     * @param delivery
+     *        the Delivery instance to accept.
+     *
+     * @throws IOException if an error occurs while sending the accept.
+     */
+    public void accept(final Delivery delivery) throws IOException {
+        checkClosed();
+
+        if (delivery == null) {
+            throw new IllegalArgumentException("Delivery to accept cannot be 
null");
+        }
+
+        final ClientFuture request = new ClientFuture();
+        session.getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+                checkClosed();
+                try {
+                    if (!delivery.isSettled()) {
+                        delivery.disposition(Accepted.getInstance());
+                        delivery.settle();
+                        session.pumpToProtonTransport();
+                    }
+                    request.onSuccess();
+                } catch (Exception e) {
+                    request.onFailure(e);
+                }
+            }
+        });
+
+        request.sync();
+    }
+
+    /**
+     * Reject a message that was dispatched under the given Delivery instance.
+     *
+     * @param delivery
+     *        the Delivery instance to reject.
+     * @param undeliverableHere
+     *        marks the delivery as not being able to be process by link it 
was sent to.
+     * @param deliveryFailed
+     *        indicates that the delivery failed for some reason.
+     *
+     * @throws IOException if an error occurs while sending the reject.
+     */
+    public void reject(final Delivery delivery, final boolean 
undeliverableHere, final boolean deliveryFailed) throws IOException {
+        checkClosed();
+
+        if (delivery == null) {
+            throw new IllegalArgumentException("Delivery to reject cannot be 
null");
+        }
+
+        final ClientFuture request = new ClientFuture();
+        session.getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+                checkClosed();
+                try {
+                    if (!delivery.isSettled()) {
+                        Modified disposition = new Modified();
+                        disposition.setUndeliverableHere(undeliverableHere);
+                        disposition.setDeliveryFailed(deliveryFailed);
+                        delivery.disposition(disposition);
+                        delivery.settle();
+                        session.pumpToProtonTransport();
+                    }
+                    request.onSuccess();
+                } catch (Exception e) {
+                    request.onFailure(e);
+                }
+            }
+        });
+
+        request.sync();
+    }
+
+    /**
+     * Release a message that was dispatched under the given Delivery instance.
+     *
+     * @param delivery
+     *        the Delivery instance to release.
+     *
+     * @throws IOException if an error occurs while sending the release.
+     */
+    public void release(final Delivery delivery) throws IOException {
+        checkClosed();
+
+        if (delivery == null) {
+            throw new IllegalArgumentException("Delivery to release cannot be 
null");
+        }
+
+        final ClientFuture request = new ClientFuture();
+        session.getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+                checkClosed();
+                try {
+                    if (!delivery.isSettled()) {
+                        delivery.disposition(Released.getInstance());
+                        delivery.settle();
+                        session.pumpToProtonTransport();
+                    }
+                    request.onSuccess();
+                } catch (Exception e) {
+                    request.onFailure(e);
+                }
+            }
+        });
+
+        request.sync();
+    }
+
+    /**
+     * @return an unmodifiable view of the underlying Receiver instance.
+     */
+    public Receiver getReceiver() {
+        return new UnmodifiableReceiver(getEndpoint());
+    }
+
+    //----- Receiver configuration properties 
--------------------------------//
+
+    public boolean isPresettle() {
+        return presettle;
+    }
+
+    public void setPresettle(boolean presettle) {
+        this.presettle = presettle;
+    }
+
+    public boolean isDurable() {
+        return subscriptionName != null;
+    }
+
+    public String getSubscriptionName() {
+        return subscriptionName;
+    }
+
+    public void setSubscriptionName(String subscriptionName) {
+        this.subscriptionName = subscriptionName;
+    }
+
+    public String getSelector() {
+        return selector;
+    }
+
+    public void setSelector(String selector) {
+        this.selector = selector;
+    }
+
+    public boolean isNoLocal() {
+        return noLocal;
+    }
+
+    public void setNoLocal(boolean noLocal) {
+        this.noLocal = noLocal;
+    }
+
+    //----- Internal implementation 
------------------------------------------//
+
+    @Override
+    protected void doOpen() {
+
+        Source source = new Source();
+        source.setAddress(address);
+        Target target = new Target();
+
+        configureSource(source);
+
+        String receiverName = receiverId + ":" + address;
+
+        if (getSubscriptionName() != null && !getSubscriptionName().isEmpty()) 
{
+            // In the case of Durable Topic Subscriptions the client must use 
the same
+            // receiver name which is derived from the subscription name 
property.
+            receiverName = getSubscriptionName();
+        }
+
+        Receiver receiver = session.getEndpoint().receiver(receiverName);
+        receiver.setSource(source);
+        receiver.setTarget(target);
+        if (isPresettle()) {
+            receiver.setSenderSettleMode(SenderSettleMode.SETTLED);
+        } else {
+            receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+        }
+        receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+        setEndpoint(receiver);
+
+        super.doOpen();
+    }
+
+    @Override
+    protected void doOpenCompletion() {
+        // Verify the attach response contained a non-null Source
+        org.apache.qpid.proton.amqp.transport.Source s = 
getEndpoint().getRemoteSource();
+        if (s != null) {
+            super.doOpenCompletion();
+        } else {
+            // No link terminus was created, the peer will now detach/close us.
+        }
+    }
+
+    @Override
+    protected void doClose() {
+        if (isDurable()) {
+            getEndpoint().detach();
+        } else {
+            getEndpoint().close();
+        }
+    }
+
+    @Override
+    protected Exception getOpenAbortException() {
+        // Verify the attach response contained a non-null Source
+        org.apache.qpid.proton.amqp.transport.Source s = 
getEndpoint().getRemoteSource();
+        if (s != null) {
+            return super.getOpenAbortException();
+        } else {
+            // No link terminus was created, the peer has detach/closed us, 
create IDE.
+            return new InvalidDestinationException("Link creation was 
refused");
+        }
+    }
+
+    @Override
+    protected void doOpenInspection() {
+        getStateInspector().inspectOpenedResource(getReceiver());
+    }
+
+    @Override
+    protected void doClosedInspection() {
+        getStateInspector().inspectClosedResource(getReceiver());
+    }
+
+    @Override
+    protected void doDetachedInspection() {
+        getStateInspector().inspectDetachedResource(getReceiver());
+    }
+
+    protected void configureSource(Source source) {
+        Map<Symbol, DescribedType> filters = new HashMap<Symbol, 
DescribedType>();
+        Symbol[] outcomes = new Symbol[]{ACCEPTED_DESCRIPTOR_SYMBOL, 
REJECTED_DESCRIPTOR_SYMBOL,
+                                         RELEASED_DESCRIPTOR_SYMBOL, 
MODIFIED_DESCRIPTOR_SYMBOL};
+
+        if (getSubscriptionName() != null && !getSubscriptionName().isEmpty()) 
{
+            source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+            source.setDurable(TerminusDurability.UNSETTLED_STATE);
+            source.setDistributionMode(COPY);
+        } else {
+            source.setDurable(TerminusDurability.NONE);
+            source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+        }
+
+        source.setOutcomes(outcomes);
+
+        Modified modified = new Modified();
+        modified.setDeliveryFailed(true);
+        modified.setUndeliverableHere(false);
+
+        source.setDefaultOutcome(modified);
+
+        if (isNoLocal()) {
+            filters.put(NO_LOCAL_NAME, AmqpNoLocalType.NO_LOCAL);
+        }
+
+        if (getSelector() != null && !getSelector().trim().equals("")) {
+            filters.put(JMS_SELECTOR_NAME, new 
AmqpJmsSelectorType(getSelector()));
+        }
+
+        if (!filters.isEmpty()) {
+            source.setFilter(filters);
+        }
+    }
+
+    @Override
+    public void processDeliveryUpdates(AmqpConnection connection) throws 
IOException {
+        Delivery incoming = null;
+        do {
+            incoming = getEndpoint().current();
+            if (incoming != null) {
+                if(incoming.isReadable() && !incoming.isPartial()) {
+                    LOG.trace("{} has incoming Message(s).", this);
+                    try {
+                        processDelivery(incoming);
+                    } catch (Exception e) {
+                        throw IOExceptionSupport.create(e);
+                    }
+                    getEndpoint().advance();
+                } else {
+                    LOG.trace("{} has a partial incoming Message(s), 
deferring.", this);
+                    incoming = null;
+                }
+            }
+        } while (incoming != null);
+
+        super.processDeliveryUpdates(connection);
+    }
+
+    private void processDelivery(Delivery incoming) throws Exception {
+        Message message = null;
+        try {
+            message = decodeIncomingMessage(incoming);
+        } catch (Exception e) {
+            LOG.warn("Error on transform: {}", e.getMessage());
+            deliveryFailed(incoming, true);
+            return;
+        }
+
+        AmqpMessage amqpMessage = new AmqpMessage(this, message, incoming);
+        // Store reference to envelope in delivery context for recovery
+        incoming.setContext(amqpMessage);
+        prefetch.add(amqpMessage);
+    }
+
+    protected Message decodeIncomingMessage(Delivery incoming) {
+        int count;
+
+        byte[] chunk = new byte[2048];
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+
+        while ((count = getEndpoint().recv(chunk, 0, chunk.length)) > 0) {
+            stream.write(chunk, 0, count);
+        }
+
+        byte[] messageBytes = stream.toByteArray();
+
+        try {
+            Message protonMessage = Message.Factory.create();
+            protonMessage.decode(messageBytes, 0, messageBytes.length);
+            return protonMessage;
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+            }
+        }
+    }
+
+    protected void deliveryFailed(Delivery incoming, boolean expandCredit) {
+        Modified disposition = new Modified();
+        disposition.setUndeliverableHere(true);
+        disposition.setDeliveryFailed(true);
+        incoming.disposition(disposition);
+        incoming.settle();
+        if (expandCredit) {
+            getEndpoint().flow(1);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + "{ address = " + address + "}";
+    }
+
+    private void checkClosed() {
+        if (isClosed()) {
+            throw new IllegalStateException("Receiver is already closed");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java
new file mode 100644
index 0000000..b4e6215
--- /dev/null
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import java.io.IOException;
+
+import org.apache.activemq.transport.amqp.client.util.AsyncResult;
+
+/**
+ * AmqpResource specification.
+ *
+ * All AMQP types should implement this interface to allow for control of state
+ * and configuration details.
+ */
+public interface AmqpResource {
+
+    /**
+     * Perform all the work needed to open this resource and store the request
+     * until such time as the remote peer indicates the resource has become 
active.
+     *
+     * @param request
+     *        The initiating request that triggered this open call.
+     */
+    void open(AsyncResult request);
+
+    /**
+     * @return if the resource has moved to the opened state on the remote.
+     */
+    boolean isOpen();
+
+    /**
+     * Called to indicate that this resource is now remotely opened.  Once 
opened a
+     * resource can start accepting incoming requests.
+     */
+    void opened();
+
+    /**
+     * Perform all work needed to close this resource and store the request
+     * until such time as the remote peer indicates the resource has been 
closed.
+     *
+     * @param request
+     *        The initiating request that triggered this close call.
+     */
+    void close(AsyncResult request);
+
+    /**
+     * @return if the resource has moved to the closed state on the remote.
+     */
+    boolean isClosed();
+
+    /**
+     * Called to indicate that this resource is now remotely closed.  Once 
closed a
+     * resource can not accept any incoming requests.
+     */
+    void closed();
+
+    /**
+     * Sets the failed state for this Resource and triggers a failure signal 
for
+     * any pending ProduverRequest.
+     */
+    void failed();
+
+    /**
+     * Called to indicate that the remote end has become closed but the 
resource
+     * was not awaiting a close.  This could happen during an open request 
where
+     * the remote does not set an error condition or during normal operation.
+     *
+     * @param connection
+     *        The connection that owns this resource.
+     */
+    void remotelyClosed(AmqpConnection connection);
+
+    /**
+     * Sets the failed state for this Resource and triggers a failure signal 
for
+     * any pending ProduverRequest.
+     *
+     * @param cause
+     *        The Exception that triggered the failure.
+     */
+    void failed(Exception cause);
+
+    /**
+     * Event handler for remote peer open of this resource.
+     *
+     * @param connection
+     *        The connection that owns this resource.
+     *
+     * @throws IOException if an error occurs while processing the update.
+     */
+    void processRemoteOpen(AmqpConnection connection) throws IOException;
+
+    /**
+     * Event handler for remote peer detach of this resource.
+     *
+     * @param connection
+     *        The connection that owns this resource.
+     *
+     * @throws IOException if an error occurs while processing the update.
+     */
+    void processRemoteDetach(AmqpConnection connection) throws IOException;
+
+    /**
+     * Event handler for remote peer close of this resource.
+     *
+     * @param connection
+     *        The connection that owns this resource.
+     *
+     * @throws IOException if an error occurs while processing the update.
+     */
+    void processRemoteClose(AmqpConnection connection) throws IOException;
+
+    /**
+     * Called when the Proton Engine signals an Delivery related event has 
been triggered
+     * for the given endpoint.
+     *
+     * @param connection
+     *        The connection that owns this resource.
+     *
+     * @throws IOException if an error occurs while processing the update.
+     */
+    void processDeliveryUpdates(AmqpConnection connection) throws IOException;
+
+    /**
+     * Called when the Proton Engine signals an Flow related event has been 
triggered
+     * for the given endpoint.
+     *
+     * @param connection
+     *        The connection that owns this resource.
+     *
+     * @throws IOException if an error occurs while processing the update.
+     */
+    void processFlowUpdates(AmqpConnection connection) throws IOException;
+
+    /**
+     * @returns true if the remote end has sent an error
+     */
+    boolean hasRemoteError();
+
+    /**
+     * @return an Exception derived from the error state of the endpoint's 
Remote Condition.
+     */
+    Exception getRemoteError();
+
+    /**
+     * @return an Error message derived from the error state of the endpoint's 
Remote Condition.
+     */
+    String getRemoteErrorMessage();
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
new file mode 100644
index 0000000..95b0743
--- /dev/null
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.InvalidDestinationException;
+
+import org.apache.activemq.transport.amqp.client.util.AsyncResult;
+import org.apache.activemq.transport.amqp.client.util.ClientFuture;
+import org.apache.activemq.transport.amqp.client.util.UnmodifiableSender;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Outcome;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.transaction.TransactionalState;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.message.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Sender class that manages a Proton sender endpoint.
+ */
+public class AmqpSender extends AmqpAbstractResource<Sender> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AmqpSender.class);
+    private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
+    //TODO: Use constants available from Proton 0.9
+    private static final Symbol ACCEPTED_DESCRIPTOR_SYMBOL = 
Symbol.valueOf("amqp:accepted:list");
+    private static final Symbol REJECTED_DESCRIPTOR_SYMBOL = 
Symbol.valueOf("amqp:rejected:list");
+
+    public static final long DEFAULT_SEND_TIMEOUT = 15000;
+
+    private final AmqpTransferTagGenerator tagGenerator = new 
AmqpTransferTagGenerator(true);
+    private final AtomicBoolean closed = new AtomicBoolean();
+
+    private final AmqpSession session;
+    private final String address;
+    private final String senderId;
+
+    private boolean presettle;
+    private long sendTimeout = DEFAULT_SEND_TIMEOUT;
+
+    private final Set<Delivery> pending = new LinkedHashSet<Delivery>();
+    private byte[] encodeBuffer = new byte[1024 * 8];
+
+    /**
+     * Create a new sender instance.
+     *
+     * @param session
+     *                   The parent session that created the session.
+     * @param address
+     *        The address that this sender produces to.
+     * @param senderId
+     *        The unique ID assigned to this sender.
+     */
+    public AmqpSender(AmqpSession session, String address, String senderId) {
+        this.session = session;
+        this.address = address;
+        this.senderId = senderId;
+    }
+
+    /**
+     * Sends the given message to this senders assigned address.
+     *
+     * @param message
+     *        the message to send.
+     *
+     * @throws IOException if an error occurs during the send.
+     */
+    public void send(final AmqpMessage message) throws IOException {
+        checkClosed();
+        final ClientFuture sendRequest = new ClientFuture();
+
+        session.getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    doSend(message, sendRequest);
+                    session.pumpToProtonTransport();
+                } catch (Exception e) {
+                    sendRequest.onFailure(e);
+                    session.getConnection().fireClientException(e);
+                }
+            }
+        });
+
+        if (sendTimeout <= 0) {
+            sendRequest.sync();
+        } else {
+            sendRequest.sync(sendTimeout, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    /**
+     * Close the sender, a closed sender will throw exceptions if any further 
send
+     * calls are made.
+     *
+     * @throws IOException if an error occurs while closing the sender.
+     */
+    public void close() throws IOException {
+        if (closed.compareAndSet(false, true)) {
+            final ClientFuture request = new ClientFuture();
+            session.getScheduler().execute(new Runnable() {
+
+                @Override
+                public void run() {
+                    checkClosed();
+                    close(request);
+                    session.pumpToProtonTransport();
+                }
+            });
+
+            request.sync();
+        }
+    }
+
+    /**
+     * @return this session's parent AmqpSession.
+     */
+    public AmqpSession getSession() {
+        return session;
+    }
+
+    /**
+     * @return an unmodifiable view of the underlying Sender instance.
+     */
+    public Sender getSender() {
+        return new UnmodifiableSender(getEndpoint());
+    }
+
+    /**
+     * @return the assigned address of this sender.
+     */
+    public String getAddress() {
+        return address;
+    }
+
+    //----- Sender configuration 
---------------------------------------------//
+
+    /**
+     * @return will messages be settle on send.
+     */
+    public boolean isPresettle() {
+        return presettle;
+    }
+
+    /**
+     * Configure is sent messages are marked as settled on send, defaults to 
false.
+     *
+     * @param presettle
+     *                   configure if this sender will presettle all sent 
messages.
+     */
+    public void setPresettle(boolean presettle) {
+        this.presettle = presettle;
+    }
+
+    /**
+     * @return the currently configured send timeout.
+     */
+    public long getSendTimeout() {
+        return sendTimeout;
+    }
+
+    /**
+     * Sets the amount of time the sender will block on a send before failing.
+     *
+     * @param sendTimeout
+     *        time in milliseconds to wait.
+     */
+    public void setSendTimeout(long sendTimeout) {
+        this.sendTimeout = sendTimeout;
+    }
+
+    //----- Private Sender implementation 
------------------------------------//
+
+    private void checkClosed() {
+        if (isClosed()) {
+            throw new IllegalStateException("Sender is already closed");
+        }
+    }
+
+    @Override
+    protected void doOpen() {
+
+        Symbol[] outcomes = new Symbol[]{ACCEPTED_DESCRIPTOR_SYMBOL, 
REJECTED_DESCRIPTOR_SYMBOL};
+        Source source = new Source();
+        source.setAddress(senderId);
+        source.setOutcomes(outcomes);
+
+        Target target = new Target();
+        target.setAddress(address);
+
+        String senderName = senderId + ":" + address;
+
+        Sender sender = session.getEndpoint().sender(senderName);
+        sender.setSource(source);
+        sender.setTarget(target);
+        if (presettle) {
+            sender.setSenderSettleMode(SenderSettleMode.SETTLED);
+        } else {
+            sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+        }
+        sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+        setEndpoint(sender);
+
+        super.doOpen();
+    }
+
+    @Override
+    protected void doOpenCompletion() {
+        // Verify the attach response contained a non-null target
+        org.apache.qpid.proton.amqp.transport.Target t = 
getEndpoint().getRemoteTarget();
+        if (t != null) {
+            super.doOpenCompletion();
+        } else {
+            // No link terminus was created, the peer will now detach/close us.
+        }
+    }
+
+    @Override
+    protected void doOpenInspection() {
+        getStateInspector().inspectOpenedResource(getSender());
+    }
+
+    @Override
+    protected void doClosedInspection() {
+        getStateInspector().inspectClosedResource(getSender());
+    }
+
+    @Override
+    protected void doDetachedInspection() {
+        getStateInspector().inspectDetachedResource(getSender());
+    }
+
+    @Override
+    protected Exception getOpenAbortException() {
+        // Verify the attach response contained a non-null target
+        org.apache.qpid.proton.amqp.transport.Target t = 
getEndpoint().getRemoteTarget();
+        if (t != null) {
+            return super.getOpenAbortException();
+        } else {
+            // No link terminus was created, the peer has detach/closed us, 
create IDE.
+            return new InvalidDestinationException("Link creation was 
refused");
+        }
+    }
+
+    private void doSend(AmqpMessage message, AsyncResult request) throws 
Exception {
+
+        LOG.trace("Producer sending message: {}", message);
+
+        byte[] tag = tagGenerator.getNextTag();
+        Delivery delivery = null;
+
+        if (presettle) {
+            delivery = getEndpoint().delivery(EMPTY_BYTE_ARRAY, 0, 0);
+        } else {
+            delivery = getEndpoint().delivery(tag, 0, tag.length);
+        }
+
+        delivery.setContext(request);
+
+        encodeAndSend(message.getWrappedMessage(), delivery);
+
+        if (presettle) {
+            delivery.settle();
+            request.onSuccess();
+        } else {
+            pending.add(delivery);
+            getEndpoint().advance();
+        }
+    }
+
+    private void encodeAndSend(Message message, Delivery delivery) throws 
IOException {
+
+        int encodedSize;
+        while (true) {
+            try {
+                encodedSize = message.encode(encodeBuffer, 0, 
encodeBuffer.length);
+                break;
+            } catch (java.nio.BufferOverflowException e) {
+                encodeBuffer = new byte[encodeBuffer.length * 2];
+            }
+        }
+
+        int sentSoFar = 0;
+
+        while (true) {
+            int sent = getEndpoint().send(encodeBuffer, sentSoFar, encodedSize 
- sentSoFar);
+            if (sent > 0) {
+                sentSoFar += sent;
+                if ((encodedSize - sentSoFar) == 0) {
+                    break;
+                }
+            } else {
+                LOG.warn("{} failed to send any data from current Message.", 
this);
+            }
+        }
+    }
+
+    @Override
+    public void processDeliveryUpdates(AmqpConnection connection) throws 
IOException {
+        List<Delivery> toRemove = new ArrayList<Delivery>();
+
+        for (Delivery delivery : pending) {
+            DeliveryState state = delivery.getRemoteState();
+            if (state == null) {
+                continue;
+            }
+
+            Outcome outcome = null;
+            if (state instanceof TransactionalState) {
+                LOG.trace("State of delivery is Transactional, retrieving 
outcome: {}", state);
+                outcome = ((TransactionalState) state).getOutcome();
+            } else if (state instanceof Outcome) {
+                outcome = (Outcome) state;
+            } else {
+                LOG.warn("Message send updated with unsupported state: {}", 
state);
+                continue;
+            }
+
+            AsyncResult request = (AsyncResult) delivery.getContext();
+
+            if (outcome instanceof Accepted) {
+                toRemove.add(delivery);
+                LOG.trace("Outcome of delivery was accepted: {}", delivery);
+                tagGenerator.returnTag(delivery.getTag());
+                if (request != null && !request.isComplete()) {
+                    request.onSuccess();
+                }
+            } else if (outcome instanceof Rejected) {
+                Exception remoteError = getRemoteError();
+                toRemove.add(delivery);
+                LOG.trace("Outcome of delivery was rejected: {}", delivery);
+                tagGenerator.returnTag(delivery.getTag());
+                if (request != null && !request.isComplete()) {
+                    request.onFailure(remoteError);
+                } else {
+                    connection.fireClientException(getRemoteError());
+                }
+            } else {
+                LOG.warn("Message send updated with unsupported outcome: {}", 
outcome);
+            }
+        }
+
+        pending.removeAll(toRemove);
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + "{ address = " + address + "}";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
new file mode 100644
index 0000000..9368b26
--- /dev/null
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.transport.amqp.client.util.ClientFuture;
+import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Session;
+
+/**
+ * Session class that manages a Proton session endpoint.
+ */
+public class AmqpSession extends AmqpAbstractResource<Session> {
+
+    private final AtomicLong receiverIdGenerator = new AtomicLong();
+    private final AtomicLong senderIdGenerator = new AtomicLong();
+
+    private final AmqpConnection connection;
+    private final String sessionId;
+
+    /**
+     * Create a new session instance.
+     *
+     * @param connection
+     *                   The parent connection that created the session.
+     * @param sessionId
+     *        The unique ID value assigned to this session.
+     */
+    public AmqpSession(AmqpConnection connection, String sessionId) {
+        this.connection = connection;
+        this.sessionId = sessionId;
+    }
+
+    /**
+     * Create a sender instance using the given address
+     *
+     * @param address
+     *               the address to which the sender will produce its messages.
+     *
+     * @return a newly created sender that is ready for use.
+     *
+     * @throws Exception if an error occurs while creating the sender.
+     */
+    public AmqpSender createSender(final String address) throws Exception {
+        checkClosed();
+
+        final AmqpSender sender = new AmqpSender(AmqpSession.this, address, 
getNextSenderId());
+        final ClientFuture request = new ClientFuture();
+
+        connection.getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+                checkClosed();
+                sender.open(request);
+                pumpToProtonTransport();
+            }
+        });
+
+        request.sync();
+
+        return sender;
+    }
+
+    /**
+     * Create a receiver instance using the given address
+     *
+     * @param address
+     *               the address to which the receiver will subscribe for its 
messages.
+     *
+     * @return a newly created receiver that is ready for use.
+     *
+     * @throws Exception if an error occurs while creating the receiver.
+     */
+    public AmqpReceiver createReceiver(String address) throws Exception {
+        checkClosed();
+
+        final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, 
address, getNextReceiverId());
+        final ClientFuture request = new ClientFuture();
+
+        connection.getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+                checkClosed();
+                receiver.open(request);
+                pumpToProtonTransport();
+            }
+        });
+
+        request.sync();
+
+        return receiver;
+    }
+
+    /**
+     * @return this session's parent AmqpConnection.
+     */
+    public AmqpConnection getConnection() {
+        return connection;
+    }
+
+    public Session getSession() {
+        return new UnmodifiableSession(getEndpoint());
+    }
+
+    //----- Internal getters used from the child AmqpResource classes 
--------//
+
+    ScheduledExecutorService getScheduler() {
+        return connection.getScheduler();
+    }
+
+    Connection getProtonConnection() {
+        return connection.getProtonConnection();
+    }
+
+    void pumpToProtonTransport() {
+        connection.pumpToProtonTransport();
+    }
+
+    //----- Private implementation details 
-----------------------------------//
+
+    @Override
+    protected void doOpenInspection() {
+        getStateInspector().inspectOpenedResource(getSession());
+    }
+
+    @Override
+    protected void doClosedInspection() {
+        getStateInspector().inspectClosedResource(getSession());
+    }
+
+    private String getNextSenderId() {
+        return sessionId + ":" + senderIdGenerator.incrementAndGet();
+    }
+
+    private String getNextReceiverId() {
+        return sessionId + ":" + receiverIdGenerator.incrementAndGet();
+    }
+
+    private void checkClosed() {
+        if (isClosed()) {
+            throw new IllegalStateException("Session is already closed");
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "AmqpSession { " + sessionId + " }";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpStateInspector.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpStateInspector.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpStateInspector.java
new file mode 100644
index 0000000..5471876
--- /dev/null
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpStateInspector.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Session;
+
+/**
+ * Abstract base for a validation hook that is used in tests to check
+ * the state of a remote resource after a variety of lifecycle events.
+ */
+public class AmqpStateInspector {
+
+    private boolean valid = true;
+    private String errorMessage;
+
+    public void inspectOpenedResource(Connection connection) {
+
+    }
+
+    public void inspectOpenedResource(Session session) {
+
+    }
+
+    public void inspectOpenedResource(Link link) {
+
+    }
+
+    public void inspectClosedResource(Connection remoteConnection) {
+
+    }
+
+    public void inspectClosedResource(Session session) {
+
+    }
+
+    public void inspectClosedResource(Link link) {
+
+    }
+
+    public void inspectDetachedResource(Link link) {
+
+    }
+
+    public boolean isValid() {
+        return valid;
+    }
+
+    protected void setValid(boolean valid) {
+        this.valid = valid;
+    }
+
+    public String getErrorMessage() {
+        return errorMessage;
+    }
+
+    protected void setErrorMessage(String errorMessage) {
+        this.errorMessage = errorMessage;
+    }
+
+    protected void markAsInvalid(String errorMessage) {
+        if (valid) {
+            setValid(false);
+            setErrorMessage(errorMessage);
+        }
+    }
+
+    public void assertIfStateChecksFailed() {
+        if (!isValid()) {
+            throw new AssertionError(errorMessage);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransferTagGenerator.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransferTagGenerator.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransferTagGenerator.java
new file mode 100644
index 0000000..08db018
--- /dev/null
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransferTagGenerator.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/**
+ * Utility class that can generate and if enabled pool the binary tag values
+ * used to identify transfers over an AMQP link.
+ */
+public final class AmqpTransferTagGenerator {
+
+    public static final int DEFAULT_TAG_POOL_SIZE = 1024;
+
+    private long nextTagId;
+    private int maxPoolSize = DEFAULT_TAG_POOL_SIZE;
+
+    private final Set<byte[]> tagPool;
+
+    public AmqpTransferTagGenerator() {
+        this(false);
+    }
+
+    public AmqpTransferTagGenerator(boolean pool) {
+        if (pool) {
+            this.tagPool = new LinkedHashSet<byte[]>();
+        } else {
+            this.tagPool = null;
+        }
+    }
+
+    /**
+     * Retrieves the next available tag.
+     *
+     * @return a new or unused tag depending on the pool option.
+     */
+    public byte[] getNextTag() {
+        byte[] rc;
+        if (tagPool != null && !tagPool.isEmpty()) {
+            final Iterator<byte[]> iterator = tagPool.iterator();
+            rc = iterator.next();
+            iterator.remove();
+        } else {
+            try {
+                rc = Long.toHexString(nextTagId++).getBytes("UTF-8");
+            } catch (UnsupportedEncodingException e) {
+                // This should never happen since we control the input.
+                throw new RuntimeException(e);
+            }
+        }
+        return rc;
+    }
+
+    /**
+     * When used as a pooled cache of tags the unused tags should always be 
returned once
+     * the transfer has been settled.
+     *
+     * @param data
+     *        a previously borrowed tag that is no longer in use.
+     */
+    public void returnTag(byte[] data) {
+        if (tagPool != null && tagPool.size() < maxPoolSize) {
+            tagPool.add(data);
+        }
+    }
+
+    /**
+     * Gets the current max pool size value.
+     *
+     * @return the current max tag pool size.
+     */
+    public int getMaxPoolSize() {
+        return maxPoolSize;
+    }
+
+    /**
+     * Sets the max tag pool size.  If the size is smaller than the current 
number
+     * of pooled tags the pool will drain over time until it matches the max.
+     *
+     * @param maxPoolSize
+     *        the maximum number of tags to hold in the pool.
+     */
+    public void setMaxPoolSize(int maxPoolSize) {
+        this.maxPoolSize = maxPoolSize;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/AbstractMechanism.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/AbstractMechanism.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/AbstractMechanism.java
new file mode 100644
index 0000000..953038a
--- /dev/null
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/AbstractMechanism.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.sasl;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Base class for SASL Authentication Mechanism that implements the basic
+ * methods of a Mechanism class.
+ */
+public abstract class AbstractMechanism implements Mechanism {
+
+    protected static final byte[] EMPTY = new byte[0];
+
+    private String username;
+    private String password;
+    private Map<String, Object> properties = new HashMap<String, Object>();
+
+    @Override
+    public int compareTo(Mechanism other) {
+
+        if (getPriority() < other.getPriority()) {
+            return -1;
+        } else if (getPriority() > other.getPriority()) {
+            return 1;
+        }
+
+        return 0;
+    }
+
+    @Override
+    public void setUsername(String value) {
+        this.username = value;
+    }
+
+    @Override
+    public String getUsername() {
+        return username;
+    }
+
+    @Override
+    public void setPassword(String value) {
+        this.password = value;
+    }
+
+    @Override
+    public String getPassword() {
+        return this.password;
+    }
+
+    @Override
+    public void setProperties(Map<String, Object> properties) {
+        this.properties = properties;
+    }
+
+    @Override
+    public Map<String, Object> getProperties() {
+        return this.properties;
+    }
+
+    @Override
+    public String toString() {
+        return "SASL-" + getName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/AnonymousMechanism.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/AnonymousMechanism.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/AnonymousMechanism.java
new file mode 100644
index 0000000..8ac8b61
--- /dev/null
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/AnonymousMechanism.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.sasl;
+
+/**
+ * Implements the Anonymous SASL authentication mechanism.
+ */
+public class AnonymousMechanism extends AbstractMechanism {
+
+    @Override
+    public byte[] getInitialResponse() {
+        return EMPTY;
+    }
+
+    @Override
+    public byte[] getChallengeResponse(byte[] challenge) {
+        return EMPTY;
+    }
+
+    @Override
+    public int getPriority() {
+        return PRIORITY.LOWEST.getValue();
+    }
+
+    @Override
+    public String getName() {
+        return "ANONYMOUS";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/CramMD5Mechanism.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/CramMD5Mechanism.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/CramMD5Mechanism.java
new file mode 100644
index 0000000..638fb2e
--- /dev/null
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/CramMD5Mechanism.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.sasl;
+
+import java.io.UnsupportedEncodingException;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+import javax.security.sasl.SaslException;
+
+/**
+ * Implements the SASL PLAIN authentication Mechanism.
+ *
+ * User name and Password values are sent without being encrypted.
+ */
+public class CramMD5Mechanism extends AbstractMechanism {
+
+    private static final String ASCII = "ASCII";
+    private static final String HMACMD5 = "HMACMD5";
+    private boolean sentResponse;
+
+    @Override
+    public int getPriority() {
+        return PRIORITY.HIGH.getValue();
+    }
+
+    @Override
+    public String getName() {
+        return "CRAM-MD5";
+    }
+
+    @Override
+    public byte[] getInitialResponse() {
+        return EMPTY;
+    }
+
+    @Override
+    public byte[] getChallengeResponse(byte[] challenge) throws SaslException {
+        if (!sentResponse && challenge != null && challenge.length != 0) {
+            try {
+                SecretKeySpec key = new 
SecretKeySpec(getPassword().getBytes(ASCII), HMACMD5);
+                Mac mac = Mac.getInstance(HMACMD5);
+                mac.init(key);
+
+                byte[] bytes = mac.doFinal(challenge);
+
+                StringBuffer hash = new StringBuffer(getUsername());
+                hash.append(' ');
+                for (int i = 0; i < bytes.length; i++) {
+                    String hex = Integer.toHexString(0xFF & bytes[i]);
+                    if (hex.length() == 1) {
+                        hash.append('0');
+                    }
+                    hash.append(hex);
+                }
+
+                sentResponse = true;
+                return hash.toString().getBytes(ASCII);
+            } catch (UnsupportedEncodingException e) {
+                throw new SaslException("Unable to utilise required encoding", 
e);
+            } catch (InvalidKeyException e) {
+                throw new SaslException("Unable to utilise key", e);
+            } catch (NoSuchAlgorithmException e) {
+                throw new SaslException("Unable to utilise required 
algorithm", e);
+            }
+        } else {
+            return EMPTY;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/Mechanism.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/Mechanism.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/Mechanism.java
new file mode 100644
index 0000000..e1296f9
--- /dev/null
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/Mechanism.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.sasl;
+
+import java.util.Map;
+
+import javax.security.sasl.SaslException;
+
+/**
+ * Interface for all SASL authentication mechanism implementations.
+ */
+public interface Mechanism extends Comparable<Mechanism> {
+
+    /**
+     * Relative priority values used to arrange the found SASL
+     * mechanisms in a preferred order where the level of security
+     * generally defines the preference.
+     */
+    public enum PRIORITY {
+        LOWEST(0),
+        LOW(1),
+        MEDIUM(2),
+        HIGH(3),
+        HIGHEST(4);
+
+        private final int value;
+
+        private PRIORITY(int value) {
+            this.value = value;
+        }
+
+        public int getValue() {
+            return value;
+       }
+    };
+
+    /**
+     * @return return the relative priority of this SASL mechanism.
+     */
+    int getPriority();
+
+    /**
+     * @return the well known name of this SASL mechanism.
+     */
+    String getName();
+
+    /**
+     * @return the response buffer used to answer the initial SASL cycle.
+     * @throws SaslException if an error occurs computing the response.
+     */
+    byte[] getInitialResponse() throws SaslException;
+
+    /**
+     * Create a response based on a given challenge from the remote peer.
+     *
+     * @param challenge
+     *        the challenge that this Mechanism should response to.
+     *
+     * @return the response that answers the given challenge.
+     * @throws SaslException if an error occurs computing the response.
+     */
+    byte[] getChallengeResponse(byte[] challenge) throws SaslException;
+
+    /**
+     * Sets the user name value for this Mechanism.  The Mechanism can ignore 
this
+     * value if it does not utilize user name in it's authentication 
processing.
+     *
+     * @param username
+     *        The user name given.
+     */
+    void setUsername(String value);
+
+    /**
+     * Returns the configured user name value for this Mechanism.
+     *
+     * @return the currently set user name value for this Mechanism.
+     */
+    String getUsername();
+
+    /**
+     * Sets the password value for this Mechanism.  The Mechanism can ignore 
this
+     * value if it does not utilize a password in it's authentication 
processing.
+     *
+     * @param username
+     *        The user name given.
+     */
+    void setPassword(String value);
+
+    /**
+     * Returns the configured password value for this Mechanism.
+     *
+     * @return the currently set password value for this Mechanism.
+     */
+    String getPassword();
+
+    /**
+     * Sets any additional Mechanism specific properties using a Map<String, 
Object>
+     *
+     * @param options
+     *        the map of additional properties that this Mechanism should 
utilize.
+     */
+    void setProperties(Map<String, Object> options);
+
+    /**
+     * The currently set Properties for this Mechanism.
+     *
+     * @return the current set of configuration Properties for this Mechanism.
+     */
+    Map<String, Object> getProperties();
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/PlainMechanism.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/PlainMechanism.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/PlainMechanism.java
new file mode 100644
index 0000000..ce26124
--- /dev/null
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/PlainMechanism.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.sasl;
+
+/**
+ * Implements the SASL PLAIN authentication Mechanism.
+ *
+ * User name and Password values are sent without being encrypted.
+ */
+public class PlainMechanism extends AbstractMechanism {
+
+    @Override
+    public int getPriority() {
+        return PRIORITY.MEDIUM.getValue();
+    }
+
+    @Override
+    public String getName() {
+        return "PLAIN";
+    }
+
+    @Override
+    public byte[] getInitialResponse() {
+
+        String username = getUsername();
+        String password = getPassword();
+
+        if (username == null) {
+            username = "";
+        }
+
+        if (password == null) {
+            password = "";
+        }
+
+        byte[] usernameBytes = username.getBytes();
+        byte[] passwordBytes = password.getBytes();
+        byte[] data = new byte[usernameBytes.length + passwordBytes.length + 
2];
+        System.arraycopy(usernameBytes, 0, data, 1, usernameBytes.length);
+        System.arraycopy(passwordBytes, 0, data, 2 + usernameBytes.length, 
passwordBytes.length);
+        return data;
+    }
+
+    @Override
+    public byte[] getChallengeResponse(byte[] challenge) {
+        return EMPTY;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/SaslAuthenticator.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/SaslAuthenticator.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/SaslAuthenticator.java
new file mode 100644
index 0000000..818ddff
--- /dev/null
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/SaslAuthenticator.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.sasl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import javax.jms.JMSSecurityException;
+import javax.security.sasl.SaslException;
+
+import org.apache.qpid.proton.engine.Sasl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manage the SASL authentication process
+ */
+public class SaslAuthenticator {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SaslAuthenticator.class);
+
+    private final Sasl sasl;
+    private final String username;
+    private final String password;
+    private Mechanism mechanism;
+
+    /**
+     * Create the authenticator and initialize it.
+     *
+     * @param sasl
+     *        The Proton SASL entry point this class will use to manage the 
authentication.
+     * @param username
+     *        The user name that will be used to authenticate.
+     * @param password
+     *        The password that will be used to authenticate.
+     */
+    public SaslAuthenticator(Sasl sasl, String username, String password) {
+        this.sasl = sasl;
+        this.username = username;
+        this.password = password;
+    }
+
+    /**
+     * Process the SASL authentication cycle until such time as an outcome is 
determine. This
+     * method must be called by the managing entity until the return value is 
true indicating a
+     * successful authentication or a JMSSecurityException is thrown 
indicating that the
+     * handshake failed.
+     *
+     * @throws JMSSecurityException
+     */
+    public boolean authenticate() throws SecurityException {
+        switch (sasl.getState()) {
+            case PN_SASL_IDLE:
+                handleSaslInit();
+                break;
+            case PN_SASL_STEP:
+                handleSaslStep();
+                break;
+            case PN_SASL_FAIL:
+                handleSaslFail();
+                break;
+            case PN_SASL_PASS:
+                return true;
+            default:
+        }
+
+        return false;
+    }
+
+    private void handleSaslInit() throws SecurityException {
+        try {
+            String[] remoteMechanisms = sasl.getRemoteMechanisms();
+            if (remoteMechanisms != null && remoteMechanisms.length != 0) {
+                mechanism = findMatchingMechanism(remoteMechanisms);
+                if (mechanism != null) {
+                    mechanism.setUsername(username);
+                    mechanism.setPassword(password);
+                    // TODO - set additional options from URI.
+                    // TODO - set a host value.
+
+                    sasl.setMechanisms(mechanism.getName());
+                    byte[] response = mechanism.getInitialResponse();
+                    if (response != null && response.length != 0) {
+                        sasl.send(response, 0, response.length);
+                    }
+                } else {
+                    // TODO - Better error message.
+                    throw new SecurityException("Could not find a matching 
SASL mechanism for the remote peer.");
+                }
+            }
+        } catch (SaslException se) {
+            // TODO - Better error message.
+            SecurityException jmsse = new SecurityException("Exception while 
processing SASL init.");
+            jmsse.initCause(se);
+            throw jmsse;
+        }
+    }
+
+    private Mechanism findMatchingMechanism(String...remoteMechanisms) {
+
+        Mechanism match = null;
+        List<Mechanism> found = new ArrayList<Mechanism>();
+
+        for (String remoteMechanism : remoteMechanisms) {
+            if (remoteMechanism.equalsIgnoreCase("PLAIN")) {
+                found.add(new PlainMechanism());
+            } else if (remoteMechanism.equalsIgnoreCase("ANONYMOUS")) {
+                found.add(new AnonymousMechanism());
+            } else if (remoteMechanism.equalsIgnoreCase("CRAM-MD5")) {
+                found.add(new CramMD5Mechanism());
+            } else {
+                LOG.debug("Unknown remote mechanism {}, skipping", 
remoteMechanism);
+            }
+        }
+
+        if (!found.isEmpty()) {
+            // Sorts by priority using Mechanism comparison and return the 
last value in
+            // list which is the Mechanism deemed to be the highest priority 
match.
+            Collections.sort(found);
+            match = found.get(found.size() - 1);
+        }
+
+        LOG.info("Best match for SASL auth was: {}", match);
+
+        return match;
+    }
+
+    private void handleSaslStep() throws SecurityException {
+        try {
+            if (sasl.pending() != 0) {
+                byte[] challenge = new byte[sasl.pending()];
+                sasl.recv(challenge, 0, challenge.length);
+                byte[] response = mechanism.getChallengeResponse(challenge);
+                sasl.send(response, 0, response.length);
+            }
+        } catch (SaslException se) {
+            // TODO - Better error message.
+            SecurityException jmsse = new SecurityException("Exception while 
processing SASL step.");
+            jmsse.initCause(se);
+            throw jmsse;
+        }
+    }
+
+    private void handleSaslFail() throws SecurityException {
+        // TODO - Better error message.
+        throw new SecurityException("Client failed to authenticate");
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/AsyncResult.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/AsyncResult.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/AsyncResult.java
new file mode 100644
index 0000000..7327ba6
--- /dev/null
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/AsyncResult.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.util;
+
+/**
+ * Defines a result interface for Asynchronous operations.
+ */
+public interface AsyncResult {
+
+    /**
+     * If the operation fails this method is invoked with the Exception
+     * that caused the failure.
+     *
+     * @param result
+     *        The error that resulted in this asynchronous operation failing.
+     */
+    void onFailure(Throwable result);
+
+    /**
+     * If the operation succeeds the resulting value produced is set to null 
and
+     * the waiting parties are signaled.
+     */
+    void onSuccess();
+
+    /**
+     * Returns true if the AsyncResult has completed.  The task is considered 
complete
+     * regardless if it succeeded or failed.
+     *
+     * @return returns true if the asynchronous operation has completed.
+     */
+    boolean isComplete();
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java
new file mode 100644
index 0000000..9f83a1d
--- /dev/null
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.util;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.util.IOExceptionSupport;
+
+/**
+ * Asynchronous Client Future class.
+ */
+public class ClientFuture extends WrappedAsyncResult {
+
+    protected final CountDownLatch latch = new CountDownLatch(1);
+    protected Throwable error;
+
+    public ClientFuture() {
+        super(null);
+    }
+
+    public ClientFuture(AsyncResult watcher) {
+        super(watcher);
+    }
+
+    @Override
+    public boolean isComplete() {
+        return latch.getCount() == 0;
+    }
+
+    @Override
+    public void onFailure(Throwable result) {
+        error = result;
+        latch.countDown();
+        super.onFailure(result);
+    }
+
+    @Override
+    public void onSuccess() {
+        latch.countDown();
+        super.onSuccess();
+    }
+
+    /**
+     * Timed wait for a response to a pending operation.
+     *
+     * @param amount
+     *        The amount of time to wait before abandoning the wait.
+     * @param unit
+     *        The unit to use for this wait period.
+     *
+     * @throws IOException if an error occurs while waiting for the response.
+     */
+    public void sync(long amount, TimeUnit unit) throws IOException {
+        try {
+            latch.await(amount, unit);
+        } catch (InterruptedException e) {
+            Thread.interrupted();
+            throw IOExceptionSupport.create(e);
+        }
+
+        failOnError();
+    }
+
+    /**
+     * Waits for a response to some pending operation.
+     *
+     * @throws IOException if an error occurs while waiting for the response.
+     */
+    public void sync() throws IOException {
+        try {
+            latch.await();
+        } catch (InterruptedException e) {
+            Thread.interrupted();
+            throw IOExceptionSupport.create(e);
+        }
+
+        failOnError();
+    }
+
+    private void failOnError() throws IOException {
+        Throwable cause = error;
+        if (cause != null) {
+            throw IOExceptionSupport.create(cause);
+        }
+    }
+}

Reply via email to