http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
new file mode 100644
index 0000000..4253434
--- /dev/null
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.JMSSecurityException;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.meta.JmsConnectionInfo;
+import org.apache.qpid.jms.meta.JmsSessionId;
+import org.apache.qpid.jms.meta.JmsSessionInfo;
+import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFactory;
+import org.apache.qpid.jms.util.IOExceptionSupport;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Sasl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, 
Connection> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AmqpConnection.class);
+
+    private final AmqpJmsMessageFactory amqpMessageFactory;
+
+    private final URI remoteURI;
+    private final Map<JmsSessionId, AmqpSession> sessions = new 
HashMap<JmsSessionId, AmqpSession>();
+    private final Map<JmsDestination, AmqpTemporaryDestination> tempDests = 
new HashMap<JmsDestination, AmqpTemporaryDestination>();
+    private final AmqpProvider provider;
+    private boolean connected;
+    private AmqpSaslAuthenticator authenticator;
+    private final AmqpSession connectionSession;
+
+    private String queuePrefix;
+    private String topicPrefix;
+    private String tempQueuePrefix;
+    private String tempTopicPrefix;
+
+    public AmqpConnection(AmqpProvider provider, Connection protonConnection, 
Sasl sasl, JmsConnectionInfo info) {
+        super(info, protonConnection);
+
+        this.provider = provider;
+        this.remoteURI = provider.getRemoteURI();
+        this.amqpMessageFactory = new AmqpJmsMessageFactory(this);
+
+        if (sasl != null) {
+            this.authenticator = new AmqpSaslAuthenticator(sasl, info);
+        }
+
+        this.info.getConnectionId().setProviderHint(this);
+
+        this.queuePrefix = info.getQueuePrefix();
+        this.topicPrefix = info.getTopicPrefix();
+        this.tempQueuePrefix = info.getTempQueuePrefix();
+        this.tempTopicPrefix = info.getTempTopicPrefix();
+
+        // Create a Session for this connection that is used for Temporary 
Destinations
+        // and perhaps later on management and advisory monitoring.
+        JmsSessionInfo sessionInfo = new JmsSessionInfo(this.info, -1);
+        sessionInfo.setAcknowledgementMode(Session.AUTO_ACKNOWLEDGE);
+
+        this.connectionSession = new AmqpSession(this, sessionInfo);
+    }
+
+    @Override
+    protected void doOpen() {
+        this.endpoint.setContainer(info.getClientId());
+        this.endpoint.setHostname(remoteURI.getHost());
+    }
+
+    @Override
+    protected void doClose() {
+    }
+
+    public AmqpSession createSession(JmsSessionInfo sessionInfo) {
+        AmqpSession session = new AmqpSession(this, sessionInfo);
+        return session;
+    }
+
+    public AmqpTemporaryDestination createTemporaryDestination(JmsDestination 
destination) {
+        AmqpTemporaryDestination temporary = new 
AmqpTemporaryDestination(connectionSession, destination);
+        return temporary;
+    }
+
+    /**
+     * Called on receiving an event from Proton indicating a state change on 
the remote
+     * side of the Connection.
+     */
+    @Override
+    public void processStateChange() {
+
+        if (!connected && isOpen()) {
+            connected = true;
+            connectionSession.open(new AsyncResult() {
+
+                @Override
+                public boolean isComplete() {
+                    return connected;
+                }
+
+                @Override
+                public void onSuccess() {
+                    LOG.debug("AMQP Connection Session opened.");
+                    opened();
+                }
+
+                @Override
+                public void onFailure(Throwable result) {
+                    LOG.debug("AMQP Connection Session failed to open.");
+                    failed(IOExceptionSupport.create(result));
+                }
+            });
+        }
+
+        EndpointState localState = endpoint.getLocalState();
+        EndpointState remoteState = endpoint.getRemoteState();
+
+        // We are still active (connected or not) and something on the remote 
end has
+        // closed us, signal an error if one was sent.
+        if (localState == EndpointState.ACTIVE && remoteState != 
EndpointState.ACTIVE) {
+            if (endpoint.getRemoteCondition().getCondition() != null) {
+                LOG.info("Error condition detected on Connection open {}.", 
endpoint.getRemoteCondition().getCondition());
+                Exception remoteError = getRemoteError();
+                if (isAwaitingOpen()) {
+                    openRequest.onFailure(remoteError);
+                } else {
+                    provider.fireProviderException(remoteError);
+                }
+            }
+        }
+
+        // Transition cleanly to closed state.
+        if (localState == EndpointState.CLOSED && remoteState == 
EndpointState.CLOSED) {
+            LOG.debug("{} has been closed successfully.", this);
+            closed();
+        }
+    }
+
+    public void processSaslAuthentication() {
+        if (connected || authenticator == null) {
+            return;
+        }
+
+        try {
+            if (authenticator.authenticate()) {
+                authenticator = null;
+            }
+        } catch (JMSSecurityException ex) {
+            failed(ex);
+        }
+    }
+
+    void addTemporaryDestination(AmqpTemporaryDestination destination) {
+        tempDests.put(destination.getJmsDestination(), destination);
+    }
+
+    void removeTemporaryDestination(AmqpTemporaryDestination destination) {
+        tempDests.remove(destination.getJmsDestination());
+    }
+
+    void addSession(AmqpSession session) {
+        this.sessions.put(session.getSessionId(), session);
+    }
+
+    void removeSession(AmqpSession session) {
+        this.sessions.remove(session.getSessionId());
+    }
+
+    public JmsConnectionInfo getConnectionInfo() {
+        return this.info;
+    }
+
+    public Connection getProtonConnection() {
+        return this.endpoint;
+    }
+
+    public URI getRemoteURI() {
+        return this.remoteURI;
+    }
+
+    public String getUsername() {
+        return this.info.getUsername();
+    }
+
+    public String getPassword() {
+        return this.info.getPassword();
+    }
+
+    public AmqpProvider getProvider() {
+        return this.provider;
+    }
+
+    public String getQueuePrefix() {
+        return queuePrefix;
+    }
+
+    public void setQueuePrefix(String queuePrefix) {
+        this.queuePrefix = queuePrefix;
+    }
+
+    public String getTopicPrefix() {
+        return topicPrefix;
+    }
+
+    public void setTopicPrefix(String topicPrefix) {
+        this.topicPrefix = topicPrefix;
+    }
+
+    public String getTempQueuePrefix() {
+        return tempQueuePrefix;
+    }
+
+    public void setTempQueuePrefix(String tempQueuePrefix) {
+        this.tempQueuePrefix = tempQueuePrefix;
+    }
+
+    public String getTempTopicPrefix() {
+        return tempTopicPrefix;
+    }
+
+    public void setTempTopicPrefix(String tempTopicPrefix) {
+        this.tempTopicPrefix = tempTopicPrefix;
+    }
+
+    /**
+     * Retrieve the indicated Session instance from the list of active 
sessions.
+     *
+     * @param sessionId
+     *        The JmsSessionId that's associated with the target session.
+     *
+     * @return the AmqpSession associated with the given id.
+     */
+    public AmqpSession getSession(JmsSessionId sessionId) {
+        if (sessionId.getProviderHint() instanceof AmqpSession) {
+            return (AmqpSession) sessionId.getProviderHint();
+        }
+        return this.sessions.get(sessionId);
+    }
+
+    /**
+     * @return true if the provider has been configured for presettle 
operations.
+     */
+    public boolean isPresettleConsumers() {
+        return provider.isPresettleConsumers();
+    }
+
+    /**
+     * @return true if the provider has been configured for presettle 
operations.
+     */
+    public boolean isPresettleProducers() {
+        return provider.isPresettleProducers();
+    }
+
+    /**
+     * @return the AMQP based JmsMessageFactory for this Connection.
+     */
+    public AmqpJmsMessageFactory getAmqpMessageFactory() {
+        return this.amqpMessageFactory;
+    }
+
+    @Override
+    public String toString() {
+        return "AmqpConnection { " + getConnectionInfo().getConnectionId() + " 
}";
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
new file mode 100644
index 0000000..3ecb58e
--- /dev/null
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -0,0 +1,461 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.message.JmsMessage;
+import org.apache.qpid.jms.meta.JmsConsumerId;
+import org.apache.qpid.jms.meta.JmsConsumerInfo;
+import org.apache.qpid.jms.meta.JmsMessageId;
+import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.provider.ProviderListener;
+import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+import org.apache.qpid.jms.util.IOExceptionSupport;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
+import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.proton.amqp.transaction.TransactionalState;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.jms.EncodedMessage;
+import org.apache.qpid.proton.jms.InboundTransformer;
+import org.apache.qpid.proton.jms.JMSMappingInboundTransformer;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.ByteArrayOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * AMQP Consumer object that is used to manage JMS MessageConsumer semantics.
+ */
+public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, 
Receiver> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AmqpConsumer.class);
+
+    protected static final Symbol COPY = Symbol.getSymbol("copy");
+    protected static final Symbol JMS_NO_LOCAL_SYMBOL = 
Symbol.valueOf("no-local");
+    protected static final Symbol JMS_SELECTOR_SYMBOL = 
Symbol.valueOf("jms-selector");
+
+    protected final AmqpSession session;
+    protected final InboundTransformer inboundTransformer =
+        new JMSMappingInboundTransformer(AmqpJMSVendor.INSTANCE);;
+    protected final Map<JmsMessageId, Delivery> delivered = new 
LinkedHashMap<JmsMessageId, Delivery>();
+    protected boolean presettle;
+
+    private final ByteArrayOutputStream streamBuffer = new 
ByteArrayOutputStream();
+    private final byte incomingBuffer[] = new byte[1024 * 64];
+
+    public AmqpConsumer(AmqpSession session, JmsConsumerInfo info) {
+        super(info);
+        this.session = session;
+
+        // Add a shortcut back to this Consumer for quicker lookups
+        this.info.getConsumerId().setProviderHint(this);
+    }
+
+    /**
+     * Starts the consumer by setting the link credit to the given prefetch 
value.
+     */
+    public void start(AsyncResult request) {
+        this.endpoint.flow(info.getPrefetchSize());
+        request.onSuccess();
+    }
+
+    @Override
+    protected void doOpen() {
+        JmsDestination destination  = info.getDestination();
+        String subscription = session.getQualifiedName(destination);
+
+        Source source = new Source();
+        source.setAddress(subscription);
+        Target target = new Target();
+
+        configureSource(source);
+
+        String receiverName = getConsumerId() + ":" + subscription;
+        if (info.getSubscriptionName() != null && 
!info.getSubscriptionName().isEmpty()) {
+            // In the case of Durable Topic Subscriptions the client must use 
the same
+            // receiver name which is derived from the subscription name 
property.
+            receiverName = info.getSubscriptionName();
+        }
+
+        endpoint = session.getProtonSession().receiver(receiverName);
+        endpoint.setSource(source);
+        endpoint.setTarget(target);
+        if (isPresettle()) {
+            endpoint.setSenderSettleMode(SenderSettleMode.SETTLED);
+        } else {
+            endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+        }
+        endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+    }
+
+    @Override
+    public void opened() {
+        this.session.addResource(this);
+        super.opened();
+    }
+
+    @Override
+    public void closed() {
+        this.session.removeResource(this);
+        super.closed();
+    }
+
+    protected void configureSource(Source source) {
+        Map<Symbol, DescribedType> filters = new HashMap<Symbol, 
DescribedType>();
+
+        if (info.getSubscriptionName() != null && 
!info.getSubscriptionName().isEmpty()) {
+            source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+            source.setDurable(TerminusDurability.UNSETTLED_STATE);
+            source.setDistributionMode(COPY);
+        } else {
+            source.setDurable(TerminusDurability.NONE);
+            source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+        }
+
+        if (info.isNoLocal()) {
+            filters.put(JMS_NO_LOCAL_SYMBOL, AmqpJmsNoLocalType.NO_LOCAL);
+        }
+
+        if (info.getSelector() != null && 
!info.getSelector().trim().equals("")) {
+            filters.put(JMS_SELECTOR_SYMBOL, new 
AmqpJmsSelectorType(info.getSelector()));
+        }
+
+        if (!filters.isEmpty()) {
+            source.setFilter(filters);
+        }
+    }
+
+    /**
+     * Called to acknowledge all messages that have been marked as delivered 
but
+     * have not yet been marked consumed.  Usually this is called as part of an
+     * client acknowledge session operation.
+     *
+     * Only messages that have already been acknowledged as delivered by the 
JMS
+     * framework will be in the delivered Map.  This means that the link credit
+     * would already have been given for these so we just need to settle them.
+     */
+    public void acknowledge() {
+        LOG.trace("Session Acknowledge for consumer: {}", 
info.getConsumerId());
+        for (Delivery delivery : delivered.values()) {
+            delivery.disposition(Accepted.getInstance());
+            delivery.settle();
+        }
+        delivered.clear();
+    }
+
+    /**
+     * Called to acknowledge a given delivery.  Depending on the Ack Mode that
+     * the consumer was created with this method can acknowledge more than just
+     * the target delivery.
+     *
+     * @param envelope
+     *        the delivery that is to be acknowledged.
+     * @param ackType
+     *        the type of acknowledgment to perform.
+     *
+     * @throws JMSException if an error occurs accessing the Message 
properties.
+     */
+    public void acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE 
ackType) throws JMSException {
+        JmsMessageId messageId = 
envelope.getMessage().getFacade().getMessageId();
+        Delivery delivery = null;
+
+        if (messageId.getProviderHint() instanceof Delivery) {
+            delivery = (Delivery) messageId.getProviderHint();
+        } else {
+            delivery = delivered.get(messageId);
+            if (delivery == null) {
+                LOG.warn("Received Ack for unknown message: {}", messageId);
+                return;
+            }
+        }
+
+        if (ackType.equals(ACK_TYPE.DELIVERED)) {
+            LOG.debug("Delivered Ack of message: {}", messageId);
+            if (session.isTransacted()) {
+                Binary txnId = 
session.getTransactionContext().getAmqpTransactionId();
+                if (txnId != null) {
+                    TransactionalState txState = new TransactionalState();
+                    txState.setOutcome(Accepted.getInstance());
+                    txState.setTxnId(txnId);
+                    delivery.disposition(txState);
+                    session.getTransactionContext().registerTxConsumer(this);
+                }
+            }
+            if (!isPresettle()) {
+                delivered.put(messageId, delivery);
+            }
+            sendFlowIfNeeded();
+        } else if (ackType.equals(ACK_TYPE.CONSUMED)) {
+            // A Consumer may not always send a delivered ACK so we need to 
check to
+            // ensure we don't add to much credit to the link.
+            if (isPresettle() || delivered.remove(messageId) == null) {
+                sendFlowIfNeeded();
+            }
+            LOG.debug("Consumed Ack of message: {}", messageId);
+            if (!delivery.isSettled()) {
+                delivery.disposition(Accepted.getInstance());
+                delivery.settle();
+            }
+        } else if (ackType.equals(ACK_TYPE.REDELIVERED)) {
+            Modified disposition = new Modified();
+            disposition.setUndeliverableHere(false);
+            disposition.setDeliveryFailed(true);
+            delivery.disposition(disposition);
+            delivery.settle();
+        } else if (ackType.equals(ACK_TYPE.POISONED)) {
+            deliveryFailed(delivery, false);
+        } else {
+            LOG.warn("Unsupporeted Ack Type for message: {}", messageId);
+        }
+    }
+
+    /**
+     * We only send more credits as the credit window dwindles to a certain 
point and
+     * then we open the window back up to full prefetch size.
+     */
+    private void sendFlowIfNeeded() {
+        if (info.getPrefetchSize() == 0) {
+            return;
+        }
+
+        int currentCredit = endpoint.getCredit();
+        if (currentCredit <= info.getPrefetchSize() * 0.2) {
+            endpoint.flow(info.getPrefetchSize() - currentCredit);
+        }
+    }
+
+    /**
+     * Recovers all previously delivered but not acknowledged messages.
+     *
+     * @throws Exception if an error occurs while performing the recover.
+     */
+    public void recover() throws Exception {
+        LOG.debug("Session Recover for consumer: {}", info.getConsumerId());
+        for (Delivery delivery : delivered.values()) {
+            // TODO - increment redelivery counter and apply connection 
redelivery policy
+            //        to those messages that are past max redlivery.
+            JmsInboundMessageDispatch envelope = (JmsInboundMessageDispatch) 
delivery.getContext();
+            envelope.onMessageRedelivered();
+            deliver(envelope);
+        }
+        delivered.clear();
+    }
+
+    /**
+     * For a consumer whose prefetch value is set to zero this method will 
attempt to solicite
+     * a new message dispatch from the broker.
+     *
+     * @param timeout
+     */
+    public void pull(long timeout) {
+        if (info.getPrefetchSize() == 0 && endpoint.getCredit() == 0) {
+            // expand the credit window by one.
+            endpoint.flow(1);
+        }
+    }
+
+    @Override
+    public void processDeliveryUpdates() throws IOException {
+        Delivery incoming = null;
+        do {
+            incoming = endpoint.current();
+            if (incoming != null && incoming.isReadable() && 
!incoming.isPartial()) {
+                LOG.trace("{} has incoming Message(s).", this);
+                try {
+                    processDelivery(incoming);
+                } catch (Exception e) {
+                    throw IOExceptionSupport.create(e);
+                }
+                endpoint.advance();
+            } else {
+                LOG.trace("{} has a partial incoming Message(s), deferring.", 
this);
+                incoming = null;
+            }
+        } while (incoming != null);
+    }
+
+    private void processDelivery(Delivery incoming) throws Exception {
+        EncodedMessage encoded = readIncomingMessage(incoming);
+        JmsMessage message = null;
+        try {
+            message = (JmsMessage) inboundTransformer.transform(encoded);
+        } catch (Exception e) {
+            LOG.warn("Error on transform: {}", e.getMessage());
+            // TODO - We could signal provider error but not sure we want to 
fail
+            //        the connection just because we can't convert the message.
+            //        In the future once the JMS mapping is complete we should 
be
+            //        able to convert everything to some message even if its 
just
+            //        a bytes messages as a fall back.
+            deliveryFailed(incoming, true);
+            return;
+        }
+
+        try {
+            message.setJMSDestination(info.getDestination());
+        } catch (JMSException e) {
+            LOG.warn("Error on transform: {}", e.getMessage());
+            // TODO - We could signal provider error but not sure we want to 
fail
+            //        the connection just because we can't convert the 
destination.
+            deliveryFailed(incoming, true);
+            return;
+        }
+
+        // Store link to delivery in the hint for use in acknowledge requests.
+        message.getFacade().getMessageId().setProviderHint(incoming);
+
+        // We need to signal to the create message that it's being dispatched 
and for now
+        // the transformer creates the message in write mode, onSend will 
reset it to read
+        // mode and the consumer will see it as a normal received message.
+        message.onSend();
+
+        JmsInboundMessageDispatch envelope = new JmsInboundMessageDispatch();
+        envelope.setMessage(message);
+        envelope.setConsumerId(info.getConsumerId());
+        envelope.setProviderHint(incoming);
+
+        // Store reference to envelope in delivery context for recovery
+        incoming.setContext(envelope);
+
+        deliver(envelope);
+    }
+
+    @Override
+    protected void doClose() {
+    }
+
+    public AmqpSession getSession() {
+        return this.session;
+    }
+
+    public JmsConsumerId getConsumerId() {
+        return this.info.getConsumerId();
+    }
+
+    public Receiver getProtonReceiver() {
+        return this.endpoint;
+    }
+
+    public boolean isBrowser() {
+        return false;
+    }
+
+    public boolean isPresettle() {
+        return presettle;
+    }
+
+    public void setPresettle(boolean presettle) {
+        this.presettle = presettle;
+    }
+
+    @Override
+    public String toString() {
+        return "AmqpConsumer { " + this.info.getConsumerId() + " }";
+    }
+
+    protected void deliveryFailed(Delivery incoming, boolean expandCredit) {
+        Modified disposition = new Modified();
+        disposition.setUndeliverableHere(true);
+        disposition.setDeliveryFailed(true);
+        incoming.disposition(disposition);
+        incoming.settle();
+        if (expandCredit) {
+            endpoint.flow(1);
+        }
+    }
+
+    protected void deliver(JmsInboundMessageDispatch envelope) throws 
Exception {
+        ProviderListener listener = 
session.getProvider().getProviderListener();
+        if (listener != null) {
+            if (envelope.getMessage() != null) {
+                LOG.debug("Dispatching received message: {}", 
envelope.getMessage().getFacade().getMessageId());
+            } else {
+                LOG.debug("Dispatching end of browse to: {}", 
envelope.getConsumerId());
+            }
+            listener.onMessage(envelope);
+        } else {
+            LOG.error("Provider listener is not set, message will be 
dropped.");
+        }
+    }
+
+    protected EncodedMessage readIncomingMessage(Delivery incoming) {
+        Buffer buffer;
+        int count;
+
+        while ((count = endpoint.recv(incomingBuffer, 0, 
incomingBuffer.length)) > 0) {
+            streamBuffer.write(incomingBuffer, 0, count);
+        }
+
+        buffer = streamBuffer.toBuffer();
+
+        try {
+            return new EncodedMessage(incoming.getMessageFormat(), 
buffer.data, buffer.offset, buffer.length);
+        } finally {
+            streamBuffer.reset();
+        }
+    }
+
+    public void preCommit() {
+    }
+
+    public void preRollback() {
+    }
+
+    /**
+     * Ensures that all delivered messages are marked as settled locally 
before the TX state
+     * is cleared and the next TX started.
+     *
+     * @throws Exception if an error occurs while performing this action.
+     */
+    public void postCommit() throws Exception {
+        for (Delivery delivery : delivered.values()) {
+            delivery.settle();
+        }
+        this.delivered.clear();
+    }
+
+    /**
+     * Redeliver Acknowledge all previously delivered messages and clear state 
to prepare for
+     * the next TX to start.
+     *
+     * @throws Exception if an error occurs while performing this action.
+     */
+    public void postRollback() throws Exception {
+        for (Delivery delivery : delivered.values()) {
+            JmsInboundMessageDispatch envelope = (JmsInboundMessageDispatch) 
delivery.getContext();
+            acknowledge(envelope, ACK_TYPE.REDELIVERED);
+        }
+        this.delivered.clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
new file mode 100644
index 0000000..a36e419
--- /dev/null
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.message.JmsMessage;
+import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
+import org.apache.qpid.jms.message.facade.JmsMessageFacade;
+import org.apache.qpid.jms.meta.JmsProducerInfo;
+import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFacade;
+import org.apache.qpid.jms.util.IOExceptionSupport;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Outcome;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.transaction.TransactionalState;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.jms.AutoOutboundTransformer;
+import org.apache.qpid.proton.jms.EncodedMessage;
+import org.apache.qpid.proton.jms.OutboundTransformer;
+import org.apache.qpid.proton.message.Message;
+import org.fusesource.hawtbuf.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * AMQP Producer object that is used to manage JMS MessageProducer semantics.
+ *
+ * This Producer is fixed to a given JmsDestination and can only produce 
messages to it.
+ */
+public class AmqpFixedProducer extends AmqpProducer {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AmqpFixedProducer.class);
+    private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
+
+    private final AmqpTransferTagGenerator tagGenerator = new 
AmqpTransferTagGenerator(true);
+    private final Set<Delivery> pending = new LinkedHashSet<Delivery>();
+    private final LinkedList<PendingSend> pendingSends = new 
LinkedList<PendingSend>();
+    private byte[] encodeBuffer = new byte[1024 * 8];
+
+    private final OutboundTransformer outboundTransformer = new 
AutoOutboundTransformer(AmqpJMSVendor.INSTANCE);
+    private final String MESSAGE_FORMAT_KEY = 
outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT";
+    private boolean presettle = false;
+
+    public AmqpFixedProducer(AmqpSession session, JmsProducerInfo info) {
+        super(session, info);
+    }
+
+    @Override
+    public void close(AsyncResult request) {
+        // If any sends are held we need to wait for them to complete.
+        if (!pendingSends.isEmpty()) {
+            this.closeRequest = request;
+            return;
+        }
+
+        super.close(request);
+    }
+
+    @Override
+    public boolean send(JmsOutboundMessageDispatch envelope, AsyncResult 
request) throws IOException, JMSException {
+
+        // TODO - Handle the case where remote has no credit which means we 
can't send to it.
+        //        We need to hold the send until remote credit becomes 
available but we should
+        //        also have a send timeout option and filter timed out sends.
+        if (endpoint.getCredit() <= 0) {
+            LOG.trace("Holding Message send until credit is available.");
+            // Once a message goes into a held mode we no longer can send it 
async, so
+            // we clear the async flag if set to avoid the sender never 
getting notified.
+            envelope.setSendAsync(false);
+            this.pendingSends.addLast(new PendingSend(envelope, request));
+            return false;
+        } else {
+            doSend(envelope, request);
+            return true;
+        }
+    }
+
+    private void doSend(JmsOutboundMessageDispatch envelope, AsyncResult 
request) throws IOException, JMSException {
+        JmsMessageFacade facade = envelope.getMessage().getFacade();
+
+        LOG.trace("Producer sending message: {}", 
envelope.getMessage().getFacade().getMessageId());
+
+        byte[] tag = tagGenerator.getNextTag();
+        Delivery delivery = null;
+
+        if (presettle) {
+            delivery = endpoint.delivery(EMPTY_BYTE_ARRAY, 0, 0);
+        } else {
+            delivery = endpoint.delivery(tag, 0, tag.length);
+        }
+
+        delivery.setContext(request);
+
+        if (session.isTransacted()) {
+            Binary amqpTxId = 
session.getTransactionContext().getAmqpTransactionId();
+            TransactionalState state = new TransactionalState();
+            state.setTxnId(amqpTxId);
+            delivery.disposition(state);
+        }
+
+        JmsMessage message = envelope.getMessage();
+        message.setReadOnlyBody(true);
+
+        // TODO: why do we need this?
+        // Possibly because AMQP spec "2.7.5 Transfer" says message format 
MUST be set on at least
+        // the first Transfer frame of a message.  That is on the encoded 
Transfer frames though and
+        // this property isn't, but rather within the application-properties 
map.  We should probably
+        // ensure this elsewhere (appears Proton does so itself in 
TransportImpl#processTransportWorkSender)
+        if (!message.getProperties().containsKey(MESSAGE_FORMAT_KEY)) {
+            message.setProperty(MESSAGE_FORMAT_KEY, 0);
+        }
+
+        if (facade instanceof AmqpJmsMessageFacade) {
+            AmqpJmsMessageFacade amqpMessage = (AmqpJmsMessageFacade) facade;
+            encodeAndSend(amqpMessage.getAmqpMessage(), delivery);
+        } else {
+            encodeAndSend(envelope.getMessage(), delivery);
+        }
+
+        if (presettle) {
+            delivery.settle();
+        } else {
+            pending.add(delivery);
+            endpoint.advance();
+        }
+
+        if (envelope.isSendAsync() || presettle) {
+            request.onSuccess();
+        }
+    }
+
+    private void encodeAndSend(Message message, Delivery delivery) throws 
IOException {
+
+        int encodedSize;
+        while (true) {
+            try {
+                encodedSize = message.encode(encodeBuffer, 0, 
encodeBuffer.length);
+                break;
+            } catch (java.nio.BufferOverflowException e) {
+                encodeBuffer = new byte[encodeBuffer.length * 2];
+            }
+        }
+
+        Buffer sendBuffer = new Buffer(encodeBuffer, 0, encodedSize);
+
+        while (true) {
+            int sent = endpoint.send(sendBuffer.data, sendBuffer.offset, 
sendBuffer.length);
+            if (sent > 0) {
+                sendBuffer.moveHead(sent);
+                if (sendBuffer.length == 0) {
+                    break;
+                }
+            } else {
+                LOG.warn("{} failed to send any data from current Message.", 
this);
+            }
+        }
+    }
+
+    private void encodeAndSend(JmsMessage message, Delivery delivery) throws 
IOException {
+
+        Buffer sendBuffer = null;
+        EncodedMessage amqp = null;
+
+        try {
+            amqp = outboundTransformer.transform(message);
+        } catch (Exception e) {
+            throw IOExceptionSupport.create(e);
+        }
+
+        if (amqp != null && amqp.getLength() > 0) {
+            sendBuffer = new Buffer(amqp.getArray(), amqp.getArrayOffset(), 
amqp.getLength());
+        }
+
+        while (true) {
+            int sent = endpoint.send(sendBuffer.data, sendBuffer.offset, 
sendBuffer.length);
+            if (sent > 0) {
+                sendBuffer.moveHead(sent);
+                if (sendBuffer.length == 0) {
+                    break;
+                }
+            } else {
+                LOG.warn("{} failed to send any data from current Message.", 
this);
+            }
+        }
+    }
+
+    @Override
+    public void processFlowUpdates() throws IOException {
+        if (!pendingSends.isEmpty() && endpoint.getCredit() > 0) {
+            while (endpoint.getCredit() > 0 && !pendingSends.isEmpty()) {
+                LOG.trace("Dispatching previously held send");
+                PendingSend held = pendingSends.pop();
+                try {
+                    doSend(held.envelope, held.request);
+                } catch (JMSException e) {
+                    throw IOExceptionSupport.create(e);
+                }
+            }
+        }
+
+        // Once the pending sends queue is drained we can propagate the close 
request.
+        if (pendingSends.isEmpty() && isAwaitingClose()) {
+            super.close(closeRequest);
+        }
+    }
+
+    @Override
+    public void processDeliveryUpdates() {
+        List<Delivery> toRemove = new ArrayList<Delivery>();
+
+        for (Delivery delivery : pending) {
+            DeliveryState state = delivery.getRemoteState();
+            if (state == null) {
+                continue;
+            }
+
+            Outcome outcome = null;
+            if (state instanceof TransactionalState) {
+                LOG.trace("State of delivery is Transactional, retrieving 
outcome: {}", state);
+                outcome = ((TransactionalState) state).getOutcome();
+            } else if (state instanceof Outcome) {
+                outcome = (Outcome) state;
+            } else {
+                LOG.warn("Message send updated with unsupported state: {}", 
state);
+                continue;
+            }
+
+            AsyncResult request = (AsyncResult) delivery.getContext();
+
+            if (outcome instanceof Accepted) {
+                toRemove.add(delivery);
+                LOG.trace("Outcome of delivery was accepted: {}", delivery);
+                tagGenerator.returnTag(delivery.getTag());
+                if (request != null && !request.isComplete()) {
+                    request.onSuccess();
+                }
+            } else if (outcome instanceof Rejected) {
+                Exception remoteError = getRemoteError();
+                toRemove.add(delivery);
+                LOG.trace("Outcome of delivery was rejected: {}", delivery);
+                tagGenerator.returnTag(delivery.getTag());
+                if (request != null && !request.isComplete()) {
+                    request.onFailure(remoteError);
+                } else {
+                    
connection.getProvider().fireProviderException(remoteError);
+                }
+            } else {
+                LOG.warn("Message send updated with unsupported outcome: {}", 
outcome);
+            }
+        }
+
+        pending.removeAll(toRemove);
+    }
+
+    @Override
+    protected void doOpen() {
+        JmsDestination destination = info.getDestination();
+
+        String destnationName = session.getQualifiedName(destination);
+        String sourceAddress = getProducerId().toString();
+        Source source = new Source();
+        source.setAddress(sourceAddress);
+        Target target = new Target();
+        target.setAddress(destnationName);
+
+        String senderName = sourceAddress + ":" + destnationName != null ? 
destnationName : "Anonymous";
+        endpoint = session.getProtonSession().sender(senderName);
+        endpoint.setSource(source);
+        endpoint.setTarget(target);
+        if (presettle) {
+            endpoint.setSenderSettleMode(SenderSettleMode.SETTLED);
+        } else {
+            endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+        }
+        endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+    }
+
+    @Override
+    protected void doClose() {
+    }
+
+    public AmqpSession getSession() {
+        return this.session;
+    }
+
+    public Sender getProtonSender() {
+        return this.endpoint;
+    }
+
+    @Override
+    public boolean isAnonymous() {
+        return false;
+    }
+
+    @Override
+    public void setPresettle(boolean presettle) {
+        this.presettle = presettle;
+    }
+
+    @Override
+    public boolean isPresettle() {
+        return this.presettle;
+    }
+
+    @Override
+    public String toString() {
+        return "AmqpFixedProducer { " + getProducerId() + " }";
+    }
+
+    private class PendingSend {
+
+        public JmsOutboundMessageDispatch envelope;
+        public AsyncResult request;
+
+        public PendingSend(JmsOutboundMessageDispatch envelope, AsyncResult 
request) {
+            this.envelope = envelope;
+            this.request = request;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJMSVendor.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJMSVendor.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJMSVendor.java
new file mode 100644
index 0000000..eba019e
--- /dev/null
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJMSVendor.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.JmsQueue;
+import org.apache.qpid.jms.JmsTemporaryQueue;
+import org.apache.qpid.jms.JmsTemporaryTopic;
+import org.apache.qpid.jms.JmsTopic;
+import org.apache.qpid.jms.message.JmsDefaultMessageFactory;
+import org.apache.qpid.jms.message.JmsMessage;
+import org.apache.qpid.jms.message.JmsMessageFactory;
+import org.apache.qpid.proton.jms.JMSVendor;
+
+/**
+ * Vendor instance used with Proton-J JMS Transformer bits.
+ *
+ * TODO - This can go once we have our own message wrappers all working.
+ */
+public class AmqpJMSVendor extends JMSVendor {
+
+    public static final AmqpJMSVendor INSTANCE = new AmqpJMSVendor();
+
+    private final JmsMessageFactory factory = new JmsDefaultMessageFactory();
+
+    private AmqpJMSVendor() {
+    }
+
+    @Override
+    public BytesMessage createBytesMessage() {
+        try {
+            return factory.createBytesMessage();
+        } catch (JMSException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public StreamMessage createStreamMessage() {
+        try {
+            return factory.createStreamMessage();
+        } catch (JMSException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Message createMessage() {
+        try {
+            return factory.createMessage();
+        } catch (JMSException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public TextMessage createTextMessage() {
+        try {
+            return factory.createTextMessage();
+        } catch (JMSException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public ObjectMessage createObjectMessage() {
+        try {
+            return factory.createObjectMessage();
+        } catch (JMSException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public MapMessage createMapMessage() {
+        try {
+            return factory.createMapMessage();
+        } catch (JMSException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Destination createDestination(String name) {
+        return super.createDestination(name, Destination.class);
+    }
+
+    @Override
+    public <T extends Destination> T createDestination(String name, Class<T> 
kind) {
+        if (kind == Queue.class) {
+            return kind.cast(new JmsQueue(name));
+        }
+        if (kind == Topic.class) {
+            return kind.cast(new JmsTopic(name));
+        }
+        if (kind == TemporaryQueue.class) {
+            return kind.cast(new JmsTemporaryQueue(name));
+        }
+        if (kind == TemporaryTopic.class) {
+            return kind.cast(new JmsTemporaryTopic(name));
+        }
+
+        return kind.cast(new JmsQueue(name));
+    }
+
+    @Override
+    public void setJMSXUserID(Message msg, String value) {
+        ((JmsMessage) msg).getFacade().setUserId(value);
+    }
+
+    @Override
+    public void setJMSXGroupID(Message msg, String value) {
+        ((JmsMessage) msg).getFacade().setGroupId(value);
+    }
+
+    @Override
+    public void setJMSXGroupSequence(Message msg, int value) {
+        ((JmsMessage) msg).getFacade().setGroupSequence(value);
+    }
+
+    @Override
+    public void setJMSXDeliveryCount(Message msg, long value) {
+        // Delivery count tracks total deliveries which is always one higher 
than
+        // re-delivery count since first delivery counts to.
+        ((JmsMessage) msg).getFacade().setRedeliveryCounter((int) (value == 0 
? value : value - 1));
+    }
+
+    @Override
+    public String toAddress(Destination dest) {
+        return ((JmsDestination) dest).getName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJmsNoLocalType.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJmsNoLocalType.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJmsNoLocalType.java
new file mode 100644
index 0000000..24c6286
--- /dev/null
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJmsNoLocalType.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+
+/**
+ * A Described Type wrapper for JMS no local option for MessageConsumer.
+ */
+public class AmqpJmsNoLocalType implements DescribedType {
+
+    public static final AmqpJmsNoLocalType NO_LOCAL = new AmqpJmsNoLocalType();
+
+    private final String noLocal;
+
+    public AmqpJmsNoLocalType() {
+        this.noLocal = "NoLocalFilter{}";
+    }
+
+    @Override
+    public Object getDescriptor() {
+        return UnsignedLong.valueOf(0x0000468C00000003L);
+    }
+
+    @Override
+    public Object getDescribed() {
+        return this.noLocal;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJmsSelectorType.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJmsSelectorType.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJmsSelectorType.java
new file mode 100644
index 0000000..021f943
--- /dev/null
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJmsSelectorType.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+
+/**
+ * A Described Type wrapper for JMS selector values.
+ */
+public class AmqpJmsSelectorType implements DescribedType {
+
+    private final String selector;
+
+    public AmqpJmsSelectorType(String selector) {
+        this.selector = selector;
+    }
+
+    @Override
+    public Object getDescriptor() {
+        return UnsignedLong.valueOf(0x0000468C00000004L);
+    }
+
+    @Override
+    public Object getDescribed() {
+        return this.selector;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
new file mode 100644
index 0000000..81ebf9e
--- /dev/null
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.io.IOException;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
+import org.apache.qpid.jms.meta.JmsProducerId;
+import org.apache.qpid.jms.meta.JmsProducerInfo;
+import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.proton.engine.Sender;
+
+/**
+ * Base class for Producer instances.
+ */
+public abstract class AmqpProducer extends 
AbstractAmqpResource<JmsProducerInfo, Sender> {
+
+    protected final AmqpSession session;
+    protected final AmqpConnection connection;
+    protected boolean presettle;
+
+    public AmqpProducer(AmqpSession session, JmsProducerInfo info) {
+        super(info);
+        this.session = session;
+        this.connection = session.getConnection();
+
+        // Add a shortcut back to this Producer for quicker lookup.
+        this.info.getProducerId().setProviderHint(this);
+    }
+
+    /**
+     * Sends the given message
+     *
+     * @param envelope
+     *        The envelope that contains the message and it's targeted 
destination.
+     * @param request
+     *        The AsyncRequest that will be notified on send success or 
failure.
+     *
+     * @returns true if the producer had credit to send or false if there was 
no available
+     *          credit and the send needed to be deferred.
+     *
+     * @throws IOException if an error occurs sending the message
+     * @throws JMSException if an error occurs while preparing the message for 
send.
+     */
+    public abstract boolean send(JmsOutboundMessageDispatch envelope, 
AsyncResult request) throws IOException, JMSException;
+
+    /**
+     * @return true if this is an anonymous producer or false if fixed to a 
given destination.
+     */
+    public abstract boolean isAnonymous();
+
+    /**
+     * @return the JmsProducerId that was assigned to this AmqpProducer.
+     */
+    public JmsProducerId getProducerId() {
+        return this.info.getProducerId();
+    }
+
+    /**
+     * @return true if the producer should presettle all sent messages.
+     */
+    public boolean isPresettle() {
+        return presettle;
+    }
+
+    /**
+     * Sets whether the producer will presettle all messages that it sends.  
Sending
+     * presettled reduces the time it takes to send a message but increases 
the change
+     * of message loss should the connection drop during send.
+     *
+     * @param presettle
+     *        true if all messages are sent settled.
+     */
+    public void setPresettle(boolean presettle) {
+        this.presettle = presettle;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
new file mode 100644
index 0000000..aad47f1
--- /dev/null
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -0,0 +1,821 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.message.JmsMessageFactory;
+import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
+import org.apache.qpid.jms.meta.JmsConnectionInfo;
+import org.apache.qpid.jms.meta.JmsConsumerId;
+import org.apache.qpid.jms.meta.JmsConsumerInfo;
+import org.apache.qpid.jms.meta.JmsDefaultResourceVisitor;
+import org.apache.qpid.jms.meta.JmsProducerId;
+import org.apache.qpid.jms.meta.JmsProducerInfo;
+import org.apache.qpid.jms.meta.JmsResource;
+import org.apache.qpid.jms.meta.JmsResourceVistor;
+import org.apache.qpid.jms.meta.JmsSessionId;
+import org.apache.qpid.jms.meta.JmsSessionInfo;
+import org.apache.qpid.jms.meta.JmsTransactionInfo;
+import org.apache.qpid.jms.provider.AbstractProvider;
+import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.provider.ProviderFuture;
+import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+import org.apache.qpid.jms.transports.TcpTransport;
+import org.apache.qpid.jms.transports.TransportListener;
+import org.apache.qpid.jms.util.IOExceptionSupport;
+import org.apache.qpid.jms.util.PropertyUtil;
+import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Event.Type;
+import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Transport;
+import org.apache.qpid.proton.engine.impl.CollectorImpl;
+import org.apache.qpid.proton.engine.impl.ProtocolTracer;
+import org.apache.qpid.proton.engine.impl.TransportImpl;
+import org.apache.qpid.proton.framing.TransportFrame;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.vertx.java.core.buffer.Buffer;
+
+/**
+ * An AMQP v1.0 Provider.
+ *
+ * The AMQP Provider is bonded to a single remote broker instance.  The 
provider will attempt
+ * to connect to only that instance and once failed can not be recovered.  For 
clients that
+ * wish to implement failover type connections a new AMQP Provider instance 
must be created
+ * and state replayed from the JMS layer using the standard recovery process 
defined in the
+ * JMS Provider API.
+ *
+ * All work within this Provider is serialized to a single Thread.  Any 
asynchronous exceptions
+ * will be dispatched from that Thread and all in-bound requests are handled 
there as well.
+ */
+public class AmqpProvider extends AbstractProvider implements 
TransportListener {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AmqpProvider.class);
+
+    private static final Logger TRACE_BYTES = 
LoggerFactory.getLogger(AmqpConnection.class.getPackage().getName() + ".BYTES");
+    private static final Logger TRACE_FRAMES = 
LoggerFactory.getLogger(AmqpConnection.class.getPackage().getName() + 
".FRAMES");
+    private static final int DEFAULT_MAX_FRAME_SIZE = 1024 * 1024 * 1;
+
+    private AmqpConnection connection;
+    private org.apache.qpid.jms.transports.Transport transport;
+    private boolean traceFrames;
+    private boolean traceBytes;
+    private boolean presettleConsumers;
+    private boolean presettleProducers;
+    private long connectTimeout = JmsConnectionInfo.DEFAULT_CONNECT_TIMEOUT;
+    private long closeTimeout = JmsConnectionInfo.DEFAULT_CLOSE_TIMEOUT;
+    private long requestTimeout = JmsConnectionInfo.DEFAULT_REQUEST_TIMEOUT;
+    private long sendTimeout = JmsConnectionInfo.DEFAULT_SEND_TIMEOUT;
+
+    private final Transport protonTransport = Transport.Factory.create();
+    private final Collector protonCollector = new CollectorImpl();
+
+    /**
+     * Create a new instance of an AmqpProvider bonded to the given remote URI.
+     *
+     * @param remoteURI
+     *        The URI of the AMQP broker this Provider instance will connect 
to.
+     */
+    public AmqpProvider(URI remoteURI) {
+        this(remoteURI, null);
+    }
+
+    /**
+     * Create a new instance of an AmqpProvider bonded to the given remote URI.
+     *
+     * @param remoteURI
+     *        The URI of the AMQP broker this Provider instance will connect 
to.
+     */
+    public AmqpProvider(URI remoteURI, Map<String, String> extraOptions) {
+        super(remoteURI);
+        updateTracer();
+    }
+
+    @Override
+    public void connect() throws IOException {
+        checkClosed();
+
+        transport = createTransport(getRemoteURI());
+
+        Map<String, String> map = Collections.emptyMap();
+        try {
+            map = PropertyUtil.parseQuery(remoteURI.getQuery());
+        } catch (Exception e) {
+            IOExceptionSupport.create(e);
+        }
+        Map<String, String> providerOptions = 
PropertyUtil.filterProperties(map, "transport.");
+
+        if (!PropertyUtil.setProperties(transport, providerOptions)) {
+            String msg = ""
+                + " Not all transport options could be set on the AMQP 
Provider transport."
+                + " Check the options are spelled correctly."
+                + " Given parameters=[" + providerOptions + "]."
+                + " This provider instance cannot be started.";
+            throw new IOException(msg);
+        }
+
+        transport.connect();
+    }
+
+    @Override
+    public void close() {
+        if (closed.compareAndSet(false, true)) {
+            final ProviderFuture request = new ProviderFuture();
+            serializer.execute(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+
+                        // If we are not connected then there is nothing we 
can do now
+                        // just signal success.
+                        if (!transport.isConnected()) {
+                            request.onSuccess();
+                        }
+
+                        if (connection != null) {
+                            connection.close(request);
+                        } else {
+                            request.onSuccess();
+                        }
+
+                        pumpToProtonTransport();
+                    } catch (Exception e) {
+                        LOG.debug("Caught exception while closing proton 
connection");
+                    }
+                }
+            });
+
+            try {
+                if (closeTimeout < 0) {
+                    request.sync();
+                } else {
+                    request.sync(closeTimeout, TimeUnit.MILLISECONDS);
+                }
+            } catch (IOException e) {
+                LOG.warn("Error caught while closing Provider: ", 
e.getMessage());
+            } finally {
+                if (transport != null) {
+                    try {
+                        transport.close();
+                    } catch (Exception e) {
+                        LOG.debug("Cuaght exception while closing down 
Transport: {}", e.getMessage());
+                    }
+                }
+
+                if (serializer != null) {
+                    serializer.shutdown();
+                }
+            }
+        }
+    }
+
+    @Override
+    public void create(final JmsResource resource, final AsyncResult request) 
throws IOException, JMSException {
+        checkClosed();
+        serializer.execute(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    checkClosed();
+                    resource.visit(new JmsResourceVistor() {
+
+                        @Override
+                        public void processSessionInfo(JmsSessionInfo 
sessionInfo) throws Exception {
+                            AmqpSession session = 
connection.createSession(sessionInfo);
+                            session.open(request);
+                        }
+
+                        @Override
+                        public void processProducerInfo(JmsProducerInfo 
producerInfo) throws Exception {
+                            AmqpSession session = 
connection.getSession(producerInfo.getParentId());
+                            AmqpProducer producer = 
session.createProducer(producerInfo);
+                            producer.open(request);
+                        }
+
+                        @Override
+                        public void processConsumerInfo(JmsConsumerInfo 
consumerInfo) throws Exception {
+                            AmqpSession session = 
connection.getSession(consumerInfo.getParentId());
+                            AmqpConsumer consumer = 
session.createConsumer(consumerInfo);
+                            consumer.open(request);
+                        }
+
+                        @Override
+                        public void processConnectionInfo(JmsConnectionInfo 
connectionInfo) throws Exception {
+                            closeTimeout = connectionInfo.getCloseTimeout();
+                            connectTimeout = 
connectionInfo.getConnectTimeout();
+                            sendTimeout = connectionInfo.getSendTimeout();
+                            requestTimeout = 
connectionInfo.getRequestTimeout();
+
+                            Connection protonConnection = 
Connection.Factory.create();
+                            protonTransport.setMaxFrameSize(getMaxFrameSize());
+                            protonTransport.bind(protonConnection);
+                            protonConnection.collect(protonCollector);
+                            Sasl sasl = protonTransport.sasl();
+                            if (sasl != null) {
+                                sasl.client();
+                            }
+                            connection = new AmqpConnection(AmqpProvider.this, 
protonConnection, sasl, connectionInfo);
+                            connection.open(request);
+                        }
+
+                        @Override
+                        public void processDestination(JmsDestination 
destination) throws Exception {
+                            if (destination.isTemporary()) {
+                                AmqpTemporaryDestination temporary = 
connection.createTemporaryDestination(destination);
+                                temporary.open(request);
+                            } else {
+                                request.onSuccess();
+                            }
+                        }
+
+                        @Override
+                        public void processTransactionInfo(JmsTransactionInfo 
transactionInfo) throws Exception {
+                            AmqpSession session = 
connection.getSession(transactionInfo.getParentId());
+                            session.begin(transactionInfo.getTransactionId(), 
request);
+                        }
+                    });
+
+                    pumpToProtonTransport();
+                } catch (Exception error) {
+                    request.onFailure(error);
+                }
+            }
+        });
+    }
+
+    @Override
+    public void start(final JmsResource resource, final AsyncResult request) 
throws IOException {
+        checkClosed();
+        serializer.execute(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    checkClosed();
+                    resource.visit(new JmsDefaultResourceVisitor() {
+
+                        @Override
+                        public void processConsumerInfo(JmsConsumerInfo 
consumerInfo) throws Exception {
+                            AmqpSession session = 
connection.getSession(consumerInfo.getParentId());
+                            AmqpConsumer consumer = 
session.getConsumer(consumerInfo);
+                            consumer.start(request);
+                        }
+                    });
+
+                    pumpToProtonTransport();
+                } catch (Exception error) {
+                    request.onFailure(error);
+                }
+            }
+        });
+    }
+
+    @Override
+    public void destroy(final JmsResource resource, final AsyncResult request) 
throws IOException {
+        //TODO: improve or delete this logging
+        LOG.debug("Destroy called");
+
+        checkClosed();
+        serializer.execute(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    //TODO: improve or delete this logging
+                    LOG.debug("Processing resource destroy request");
+                    checkClosed();
+                    resource.visit(new JmsDefaultResourceVisitor() {
+
+                        @Override
+                        public void processSessionInfo(JmsSessionInfo 
sessionInfo) throws Exception {
+                            AmqpSession session = 
connection.getSession(sessionInfo.getSessionId());
+                            session.close(request);
+                        }
+
+                        @Override
+                        public void processProducerInfo(JmsProducerInfo 
producerInfo) throws Exception {
+                            AmqpSession session = 
connection.getSession(producerInfo.getParentId());
+                            AmqpProducer producer = 
session.getProducer(producerInfo);
+                            producer.close(request);
+                        }
+
+                        @Override
+                        public void processConsumerInfo(JmsConsumerInfo 
consumerInfo) throws Exception {
+                            AmqpSession session = 
connection.getSession(consumerInfo.getParentId());
+                            AmqpConsumer consumer = 
session.getConsumer(consumerInfo);
+                            consumer.close(request);
+                        }
+
+                        @Override
+                        public void processConnectionInfo(JmsConnectionInfo 
connectionInfo) throws Exception {
+                            connection.close(request);
+                        }
+
+                        @Override
+                        public void processDestination(JmsDestination 
destination) throws Exception {
+                            // TODO - Delete remote temporary Topic or Queue
+                            request.onSuccess();
+                        }
+                    });
+
+                    pumpToProtonTransport();
+                } catch (Exception error) {
+                    request.onFailure(error);
+                }
+            }
+        });
+    }
+
+    @Override
+    public void send(final JmsOutboundMessageDispatch envelope, final 
AsyncResult request) throws IOException {
+        checkClosed();
+        serializer.execute(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    checkClosed();
+
+                    JmsProducerId producerId = envelope.getProducerId();
+                    AmqpProducer producer = null;
+
+                    if (producerId.getProviderHint() instanceof 
AmqpFixedProducer) {
+                        producer = (AmqpFixedProducer) 
producerId.getProviderHint();
+                    } else {
+                        AmqpSession session = 
connection.getSession(producerId.getParentId());
+                        producer = session.getProducer(producerId);
+                    }
+
+                    boolean couldSend = producer.send(envelope, request);
+                    pumpToProtonTransport();
+                    if (couldSend && envelope.isSendAsync()) {
+                        request.onSuccess();
+                    }
+                } catch (Exception error) {
+                    request.onFailure(error);
+                }
+            }
+        });
+    }
+
+    @Override
+    public void acknowledge(final JmsSessionId sessionId, final AsyncResult 
request) throws IOException {
+        checkClosed();
+        serializer.execute(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    checkClosed();
+                    AmqpSession amqpSession = connection.getSession(sessionId);
+                    amqpSession.acknowledge();
+                    pumpToProtonTransport();
+                    request.onSuccess();
+                } catch (Exception error) {
+                    request.onFailure(error);
+                }
+            }
+        });
+    }
+
+    @Override
+    public void acknowledge(final JmsInboundMessageDispatch envelope, final 
ACK_TYPE ackType, final AsyncResult request) throws IOException {
+        checkClosed();
+        serializer.execute(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    checkClosed();
+
+                    JmsConsumerId consumerId = envelope.getConsumerId();
+                    AmqpConsumer consumer = null;
+
+                    if (consumerId.getProviderHint() instanceof AmqpConsumer) {
+                        consumer = (AmqpConsumer) consumerId.getProviderHint();
+                    } else {
+                        AmqpSession session = 
connection.getSession(consumerId.getParentId());
+                        consumer = session.getConsumer(consumerId);
+                    }
+
+                    consumer.acknowledge(envelope, ackType);
+
+                    if (consumer.getSession().isAsyncAck()) {
+                        request.onSuccess();
+                        pumpToProtonTransport();
+                    } else {
+                        pumpToProtonTransport();
+                        request.onSuccess();
+                    }
+                } catch (Exception error) {
+                    request.onFailure(error);
+                }
+            }
+        });
+    }
+
+    @Override
+    public void commit(final JmsSessionId sessionId, final AsyncResult 
request) throws IOException {
+        checkClosed();
+        serializer.execute(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    checkClosed();
+                    AmqpSession session = connection.getSession(sessionId);
+                    session.commit(request);
+                    pumpToProtonTransport();
+                } catch (Exception error) {
+                    request.onFailure(error);
+                }
+            }
+        });
+    }
+
+    @Override
+    public void rollback(final JmsSessionId sessionId, final AsyncResult 
request) throws IOException {
+        checkClosed();
+        serializer.execute(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    checkClosed();
+                    AmqpSession session = connection.getSession(sessionId);
+                    session.rollback(request);
+                    pumpToProtonTransport();
+                } catch (Exception error) {
+                    request.onFailure(error);
+                }
+            }
+        });
+    }
+
+    @Override
+    public void recover(final JmsSessionId sessionId, final AsyncResult 
request) throws IOException {
+        checkClosed();
+        serializer.execute(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    checkClosed();
+                    AmqpSession session = connection.getSession(sessionId);
+                    session.recover();
+                    pumpToProtonTransport();
+                    request.onSuccess();
+                } catch (Exception error) {
+                    request.onFailure(error);
+                }
+            }
+        });
+    }
+
+    @Override
+    public void unsubscribe(final String subscription, final AsyncResult 
request) throws IOException {
+        checkClosed();
+        serializer.execute(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    checkClosed();
+                    pumpToProtonTransport();
+                    request.onSuccess();
+                } catch (Exception error) {
+                    request.onFailure(error);
+                }
+            }
+        });
+    }
+
+    @Override
+    public void pull(final JmsConsumerId consumerId, final long timeout, final 
AsyncResult request) throws IOException {
+        checkClosed();
+        serializer.execute(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    checkClosed();
+                    AmqpConsumer consumer = null;
+
+                    if (consumerId.getProviderHint() instanceof AmqpConsumer) {
+                        consumer = (AmqpConsumer) consumerId.getProviderHint();
+                    } else {
+                        AmqpSession session = 
connection.getSession(consumerId.getParentId());
+                        consumer = session.getConsumer(consumerId);
+                    }
+
+                    consumer.pull(timeout);
+                    pumpToProtonTransport();
+                    request.onSuccess();
+                } catch (Exception error) {
+                    request.onFailure(error);
+                }
+            }
+        });
+    }
+
+    /**
+     * Provides an extension point for subclasses to insert other types of 
transports such
+     * as SSL etc.
+     *
+     * @param remoteLocation
+     *        The remote location where the transport should attempt to 
connect.
+     *
+     * @return the newly created transport instance.
+     */
+    protected org.apache.qpid.jms.transports.Transport createTransport(URI 
remoteLocation) {
+        return new TcpTransport(this, remoteLocation);
+    }
+
+    private void updateTracer() {
+        if (isTraceFrames()) {
+            ((TransportImpl) protonTransport).setProtocolTracer(new 
ProtocolTracer() {
+                @Override
+                public void receivedFrame(TransportFrame transportFrame) {
+                    TRACE_FRAMES.trace("RECV: {}", transportFrame.getBody());
+                }
+
+                @Override
+                public void sentFrame(TransportFrame transportFrame) {
+                    TRACE_FRAMES.trace("SENT: {}", transportFrame.getBody());
+                }
+            });
+        }
+    }
+
+    @Override
+    public void onData(Buffer input) {
+
+        // Create our own copy since we will process later.
+        final ByteBuffer source = ByteBuffer.wrap(input.getBytes());
+
+        serializer.execute(new Runnable() {
+
+            @Override
+            public void run() {
+                LOG.trace("Received from Broker {} bytes:", 
source.remaining());
+
+                do {
+                    ByteBuffer buffer = protonTransport.getInputBuffer();
+                    int limit = Math.min(buffer.remaining(), 
source.remaining());
+                    ByteBuffer duplicate = source.duplicate();
+                    duplicate.limit(source.position() + limit);
+                    buffer.put(duplicate);
+                    protonTransport.processInput();
+                    source.position(source.position() + limit);
+                } while (source.hasRemaining());
+
+                // Process the state changes from the latest data and then 
answer back
+                // any pending updates to the Broker.
+                processUpdates();
+                pumpToProtonTransport();
+            }
+        });
+    }
+
+    /**
+     * Callback method for the Transport to report connection errors.  When 
called
+     * the method will queue a new task to fire the failure error back to the 
listener.
+     *
+     * @param error
+     *        the error that causes the transport to fail.
+     */
+    @Override
+    public void onTransportError(final Throwable error) {
+        if (!serializer.isShutdown()) {
+            serializer.execute(new Runnable() {
+                @Override
+                public void run() {
+                    LOG.info("Transport failed: {}", error.getMessage());
+                    if (!closed.get()) {
+                        fireProviderException(error);
+                        if (connection != null) {
+                            connection.closed();
+                        }
+                    }
+                }
+            });
+        }
+    }
+
+    /**
+     * Callback method for the Transport to report that the underlying 
connection
+     * has closed.  When called this method will queue a new task that will 
check for
+     * the closed state on this transport and if not closed then an exception 
is raied
+     * to the registered ProviderListener to indicate connection loss.
+     */
+    @Override
+    public void onTransportClosed() {
+        // TODO: improve or delete this logging
+        LOG.debug("onTransportClosed listener called");
+        if (!serializer.isShutdown()) {
+            serializer.execute(new Runnable() {
+                @Override
+                public void run() {
+                    LOG.debug("Transport connection remotely closed");
+                    if (!closed.get()) {
+                        fireProviderException(new IOException("Connection 
remotely closed."));
+                        if (connection != null) {
+                            connection.closed();
+                        }
+                    }
+                }
+            });
+        }
+    }
+
+    private void processUpdates() {
+        try {
+            Event protonEvent = null;
+            while ((protonEvent = protonCollector.peek()) != null) {
+                if (!protonEvent.getType().equals(Type.TRANSPORT)) {
+                    LOG.trace("New Proton Event: {}", protonEvent.getType());
+                }
+
+                AmqpResource amqpResource = null;
+                switch (protonEvent.getType()) {
+                    case CONNECTION_REMOTE_CLOSE:
+                    case CONNECTION_REMOTE_OPEN:
+                        AmqpConnection connection = (AmqpConnection) 
protonEvent.getConnection().getContext();
+                        connection.processStateChange();
+                        break;
+                    case SESSION_REMOTE_CLOSE:
+                    case SESSION_REMOTE_OPEN:
+                        AmqpSession session = (AmqpSession) 
protonEvent.getSession().getContext();
+                        session.processStateChange();
+                        break;
+                    case LINK_REMOTE_CLOSE:
+                    case LINK_REMOTE_OPEN:
+                        AmqpResource resource = (AmqpResource) 
protonEvent.getLink().getContext();
+                        resource.processStateChange();
+                        break;
+                    case LINK_FLOW:
+                        amqpResource = (AmqpResource) 
protonEvent.getLink().getContext();
+                        amqpResource.processFlowUpdates();
+                        break;
+                    case DELIVERY:
+                        amqpResource = (AmqpResource) 
protonEvent.getLink().getContext();
+                        amqpResource.processDeliveryUpdates();
+                        break;
+                    default:
+                        break;
+                }
+
+                protonCollector.pop();
+            }
+
+            // We have to do this to pump SASL bytes in as SASL is not event 
driven yet.
+            if (connection != null) {
+                connection.processSaslAuthentication();
+            }
+        } catch (Exception ex) {
+            LOG.warn("Caught Exception during update processing: {}", 
ex.getMessage(), ex);
+            fireProviderException(ex);
+        }
+    }
+
+    private void pumpToProtonTransport() {
+        try {
+            boolean done = false;
+            while (!done) {
+                ByteBuffer toWrite = protonTransport.getOutputBuffer();
+                if (toWrite != null && toWrite.hasRemaining()) {
+                    // TODO - Get Bytes in a readable form
+                    if (isTraceBytes()) {
+                        TRACE_BYTES.info("Sending: {}", toWrite.toString());
+                    }
+                    transport.send(toWrite);
+                    protonTransport.outputConsumed();
+                } else {
+                    done = true;
+                }
+            }
+        } catch (IOException e) {
+            fireProviderException(e);
+        }
+    }
+
+    //---------- Property Setters and Getters 
--------------------------------//
+
+    @Override
+    public JmsMessageFactory getMessageFactory() {
+        if (connection == null) {
+            throw new RuntimeException("Message Factory is not accessible when 
not connected.");
+        }
+        return connection.getAmqpMessageFactory();
+    }
+
+    public void setTraceFrames(boolean trace) {
+        this.traceFrames = trace;
+        updateTracer();
+    }
+
+    public boolean isTraceFrames() {
+        return this.traceFrames;
+    }
+
+    public void setTraceBytes(boolean trace) {
+        this.traceBytes = trace;
+    }
+
+    public boolean isTraceBytes() {
+        return this.traceBytes;
+    }
+
+    public long getCloseTimeout() {
+        return this.closeTimeout;
+    }
+
+    public void setCloseTimeout(long closeTimeout) {
+        this.closeTimeout = closeTimeout;
+    }
+
+    public long getConnectTimeout() {
+        return connectTimeout;
+    }
+
+    public void setConnectTimeout(long connectTimeout) {
+        this.connectTimeout = connectTimeout;
+    }
+
+    public long getRequestTimeout() {
+        return requestTimeout;
+    }
+
+    public void setRequestTimeout(long requestTimeout) {
+        this.requestTimeout = requestTimeout;
+    }
+
+    public long getSendTimeout() {
+        return sendTimeout;
+    }
+
+    public void setSendTimeout(long sendTimeout) {
+        this.sendTimeout = sendTimeout;
+    }
+
+    public void setPresettle(boolean presettle) {
+        setPresettleConsumers(presettle);
+        setPresettleProducers(presettle);
+    }
+
+    public boolean isPresettleConsumers() {
+        return this.presettleConsumers;
+    }
+
+    public void setPresettleConsumers(boolean presettle) {
+        this.presettleConsumers = presettle;
+    }
+
+    public boolean isPresettleProducers() {
+        return this.presettleProducers;
+    }
+
+    public void setPresettleProducers(boolean presettle) {
+        this.presettleProducers = presettle;
+    }
+
+    /**
+     * @return the currently set Max Frame Size value.
+     */
+    public int getMaxFrameSize() {
+        return DEFAULT_MAX_FRAME_SIZE;
+    }
+
+    @Override
+    public String toString() {
+        return "AmqpProvider: " + getRemoteURI().getHost() + ":" + 
getRemoteURI().getPort();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to