http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java
----------------------------------------------------------------------
diff --git 
a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java
 
b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java
new file mode 100644
index 0000000..4ebf21a
--- /dev/null
+++ 
b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+/**
+* @author <a href="http://hiramchirino.com";>Hiram Chirino</a>
+*/
+public enum TransportState {
+    CREATED,
+    CONNECTING,
+    CONNECTED,
+    DISCONNECTING,
+    DISCONNECTED
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java
----------------------------------------------------------------------
diff --git 
a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java
 
b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java
new file mode 100644
index 0000000..de8a2cd
--- /dev/null
+++ 
b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.fusesource.hawtbuf.Buffer;
+
+/**
+ * @author <a href="http://hiramchirino.com";>Hiram Chirino</a>
+ */
+public class AmqpHeader {
+
+    static final Buffer PREFIX = new Buffer(new byte[]{
+      'A', 'M', 'Q', 'P'
+    });
+
+    private Buffer buffer;
+
+    public AmqpHeader(){
+        this(new Buffer(new byte[]{
+          'A', 'M', 'Q', 'P', 0, 1, 0, 0
+        }));
+    }
+
+    public AmqpHeader(Buffer buffer){
+        setBuffer(buffer);
+    }
+
+    public int getProtocolId() {
+        return buffer.get(4) & 0xFF;
+    }
+    public void setProtocolId(int value) {
+        buffer.data[buffer.offset+4] = (byte) value;
+    }
+
+    public int getMajor() {
+        return buffer.get(5) & 0xFF;
+    }
+    public void setMajor(int value) {
+        buffer.data[buffer.offset+5] = (byte) value;
+    }
+
+    public int getMinor() {
+        return buffer.get(6) & 0xFF;
+    }
+    public void setMinor(int value) {
+        buffer.data[buffer.offset+6] = (byte) value;
+    }
+
+    public int getRevision() {
+        return buffer.get(7) & 0xFF;
+    }
+    public void setRevision(int value) {
+        buffer.data[buffer.offset+7] = (byte) value;
+    }
+
+    public Buffer getBuffer() {
+        return buffer;
+    }
+    public void setBuffer(Buffer value) {
+        if( !value.startsWith(PREFIX) || value.length()!=8 ) {
+            throw new IllegalArgumentException("Not an AMQP header buffer");
+        }
+        buffer = value.buffer();
+    }
+
+
+    @Override
+    public String toString() {
+        return buffer.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java
----------------------------------------------------------------------
diff --git 
a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java
 
b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java
new file mode 100644
index 0000000..f372d99
--- /dev/null
+++ 
b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.*;
+import org.fusesource.hawtdispatch.Task;
+
+import java.io.IOException;
+
+
+/**
+* @author <a href="http://hiramchirino.com";>Hiram Chirino</a>
+*/
+public class AmqpListener {
+
+    public Sasl processSaslConnect(ProtonJTransport protonTransport) {
+        return null;
+    }
+
+    public Sasl processSaslEvent(Sasl sasl) {
+        return sasl;
+    }
+
+    public void processRemoteOpen(Endpoint endpoint, Task onComplete) {
+        ErrorCondition condition = endpoint.getCondition();
+        condition.setCondition(Symbol.valueOf("error"));
+        condition.setDescription("Not supported");
+        endpoint.close();
+        onComplete.run();
+    }
+
+    public void processRemoteClose(Endpoint endpoint, Task onComplete) {
+        endpoint.close();
+        onComplete.run();
+    }
+
+    public void processDelivery(Delivery delivery){
+    }
+
+    public void processTransportConnected() {
+    }
+
+    public void processTransportFailure(IOException e) {
+        this.processFailure(e);
+    }
+
+    public void processFailure(Throwable e) {
+        e.printStackTrace();
+    }
+
+    public void processRefill() {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java
----------------------------------------------------------------------
diff --git 
a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java
 
b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java
new file mode 100644
index 0000000..13ed1e3
--- /dev/null
+++ 
b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtdispatch.transport.AbstractProtocolCodec;
+
+import java.io.IOException;
+
+/**
+ * A HawtDispatch protocol codec that encodes/decodes AMQP 1.0 frames.
+ *
+ * @author <a href="http://hiramchirino.com";>Hiram Chirino</a>
+ */
+public class AmqpProtocolCodec extends AbstractProtocolCodec {
+
+    int maxFrameSize = 4*1024*1024;
+
+    @Override
+    protected void encode(Object object) throws IOException {
+        nextWriteBuffer.write((Buffer) object);
+    }
+
+    @Override
+    protected Action initialDecodeAction() {
+        return new Action() {
+            public Object apply() throws IOException {
+                Buffer magic = readBytes(8);
+                if (magic != null) {
+                    nextDecodeAction = readFrameSize;
+                    return new AmqpHeader(magic);
+                } else {
+                    return null;
+                }
+            }
+        };
+    }
+
+    private final Action readFrameSize = new Action() {
+        public Object apply() throws IOException {
+            Buffer sizeBytes = peekBytes(4);
+            if (sizeBytes != null) {
+                int size = sizeBytes.bigEndianEditor().readInt();
+                if (size < 8) {
+                    throw new IOException(String.format("specified frame size 
%d is smaller than minimum frame size", size));
+                }
+                if( size > maxFrameSize ) {
+                    throw new IOException(String.format("specified frame size 
%d is larger than maximum frame size", size));
+                }
+
+                // TODO: check frame min and max size..
+                nextDecodeAction = readFrame(size);
+                return nextDecodeAction.apply();
+            } else {
+                return null;
+            }
+        }
+    };
+
+
+    private final Action readFrame(final int size) {
+        return new Action() {
+            public Object apply() throws IOException {
+                Buffer frameData = readBytes(size);
+                if (frameData != null) {
+                    nextDecodeAction = readFrameSize;
+                    return frameData;
+                } else {
+                    return null;
+                }
+            }
+        };
+    }
+
+    public int getReadBytesPendingDecode() {
+        return readBuffer.position() - readStart;
+    }
+
+    public void skipProtocolHeader() {
+        nextDecodeAction = readFrameSize;
+    }
+
+    public void readProtocolHeader() {
+        nextDecodeAction = initialDecodeAction();
+    }
+
+    public int getMaxFrameSize() {
+        return maxFrameSize;
+    }
+
+    public void setMaxFrameSize(int maxFrameSize) {
+        this.maxFrameSize = maxFrameSize;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java
----------------------------------------------------------------------
diff --git 
a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java
 
b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java
new file mode 100644
index 0000000..9ea048b
--- /dev/null
+++ 
b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java
@@ -0,0 +1,587 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.apache.qpid.proton.hawtdispatch.api.AmqpConnectOptions;
+import org.apache.qpid.proton.hawtdispatch.api.Callback;
+import org.apache.qpid.proton.hawtdispatch.api.ChainedCallback;
+import org.apache.qpid.proton.hawtdispatch.api.TransportState;
+import org.apache.qpid.proton.engine.*;
+import org.apache.qpid.proton.engine.impl.ByteBufferUtils;
+import org.apache.qpid.proton.engine.impl.ProtocolTracer;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.DataByteArrayOutputStream;
+import org.fusesource.hawtbuf.UTF8Buffer;
+import org.fusesource.hawtdispatch.*;
+import org.fusesource.hawtdispatch.transport.DefaultTransportListener;
+import org.fusesource.hawtdispatch.transport.SslTransport;
+import org.fusesource.hawtdispatch.transport.TcpTransport;
+import org.fusesource.hawtdispatch.transport.Transport;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.LinkedList;
+
+import static org.apache.qpid.proton.hawtdispatch.api.TransportState.*;
+import static org.fusesource.hawtdispatch.Dispatch.NOOP;
+
+/**
+ * @author <a href="http://hiramchirino.com";>Hiram Chirino</a>
+ */
+public class AmqpTransport extends WatchBase {
+
+    private TransportState state = CREATED;
+
+    final DispatchQueue queue;
+    final ProtonJConnection connection;
+    Transport hawtdispatchTransport;
+    ProtonJTransport protonTransport;
+    Throwable failure;
+    CustomDispatchSource<Defer,LinkedList<Defer>> defers;
+
+    public static final EnumSet<EndpointState> ALL_SET = 
EnumSet.allOf(EndpointState.class);
+
+    private AmqpTransport(DispatchQueue queue) {
+        this.queue = queue;
+        this.connection = (ProtonJConnection) Connection.Factory.create();
+
+        defers = Dispatch.createSource(EventAggregators.<Defer>linkedList(), 
this.queue);
+        defers.setEventHandler(new Task(){
+            public void run() {
+                for( Defer defer: defers.getData() ) {
+                    assert defer.defered = true;
+                    defer.defered = false;
+                    defer.run();
+                }
+            }
+        });
+        defers.resume();
+    }
+
+    static public AmqpTransport connect(AmqpConnectOptions options) {
+        AmqpConnectOptions opts = options.clone();
+        if( opts.getDispatchQueue() == null ) {
+            opts.setDispatchQueue(Dispatch.createQueue());
+        }
+        if( opts.getBlockingExecutor() == null ) {
+            
opts.setBlockingExecutor(AmqpConnectOptions.getBlockingThreadPool());
+        }
+        return new AmqpTransport(opts.getDispatchQueue()).connecting(opts);
+    }
+
+    private AmqpTransport connecting(final AmqpConnectOptions options) {
+        assert state == CREATED;
+        try {
+            state = CONNECTING;
+            if( options.getLocalContainerId()!=null ) {
+                connection.setLocalContainerId(options.getLocalContainerId());
+            }
+            if( options.getRemoteContainerId()!=null ) {
+                connection.setContainer(options.getRemoteContainerId());
+            }
+            connection.setHostname(options.getHost().getHost());
+            Callback<Void> onConnect = new Callback<Void>() {
+                @Override
+                public void onSuccess(Void value) {
+                    if( state == CONNECTED ) {
+                        hawtdispatchTransport.setTransportListener(new 
AmqpTransportListener());
+                        fireWatches();
+                    }
+                }
+
+                @Override
+                public void onFailure(Throwable value) {
+                    if( state == CONNECTED || state == CONNECTING ) {
+                        failure = value;
+                        disconnect();
+                        fireWatches();
+                    }
+                }
+            };
+            if( options.getUser()!=null ) {
+                onConnect = new SaslClientHandler(options, onConnect);
+            }
+            createTransport(options, onConnect);
+        } catch (Throwable e) {
+            failure = e;
+        }
+        fireWatches();
+        return this;
+    }
+
+    public TransportState getState() {
+        return state;
+    }
+
+    /**
+     * Creates and start a transport to the AMQP server.  Passes it to the 
onConnect
+     * once the transport is connected.
+     *
+     * @param onConnect
+     * @throws Exception
+     */
+    void createTransport(AmqpConnectOptions options, final Callback<Void> 
onConnect) throws Exception {
+        final TcpTransport transport;
+        if( options.getSslContext() !=null ) {
+            SslTransport ssl = new SslTransport();
+            ssl.setSSLContext(options.getSslContext());
+            transport = ssl;
+        } else {
+            transport = new TcpTransport();
+        }
+
+        URI host = options.getHost();
+        if( host.getPort() == -1 ) {
+            if( options.getSslContext()!=null ) {
+                host = new URI(host.getScheme()+"://"+host.getHost()+":5672");
+            } else {
+                host = new URI(host.getScheme()+"://"+host.getHost()+":5671");
+            }
+        }
+
+
+        transport.setBlockingExecutor(options.getBlockingExecutor());
+        transport.setDispatchQueue(options.getDispatchQueue());
+
+        transport.setMaxReadRate(options.getMaxReadRate());
+        transport.setMaxWriteRate(options.getMaxWriteRate());
+        transport.setReceiveBufferSize(options.getReceiveBufferSize());
+        transport.setSendBufferSize(options.getSendBufferSize());
+        transport.setTrafficClass(options.getTrafficClass());
+        transport.setUseLocalHost(options.isUseLocalHost());
+        transport.connecting(host, options.getLocalAddress());
+
+        transport.setTransportListener(new DefaultTransportListener(){
+            public void onTransportConnected() {
+                if(state==CONNECTING) {
+                    state = CONNECTED;
+                    onConnect.onSuccess(null);
+                    transport.resumeRead();
+                }
+            }
+
+            public void onTransportFailure(final IOException error) {
+                if(state==CONNECTING) {
+                    onConnect.onFailure(error);
+                }
+            }
+
+        });
+        transport.connecting(host, options.getLocalAddress());
+        bind(transport);
+        transport.start(NOOP);
+    }
+
+    class SaslClientHandler extends ChainedCallback<Void, Void> {
+
+        private final AmqpConnectOptions options;
+
+        public SaslClientHandler(AmqpConnectOptions options, Callback<Void> 
next) {
+            super(next);
+            this.options = options;
+        }
+
+        public void onSuccess(final Void value) {
+            final Sasl s = protonTransport.sasl();
+            s.client();
+            pumpOut();
+            hawtdispatchTransport.setTransportListener(new 
AmqpTransportListener() {
+
+                Sasl sasl = s;
+
+                @Override
+                void process() {
+                    if (sasl != null) {
+                        sasl = processSaslEvent(sasl);
+                        if (sasl == null) {
+                            // once sasl handshake is done.. we need to read 
the protocol header again.
+                            ((AmqpProtocolCodec) 
hawtdispatchTransport.getProtocolCodec()).readProtocolHeader();
+                        }
+                    }
+                }
+
+                @Override
+                public void onTransportFailure(IOException error) {
+                    next.onFailure(error);
+                }
+
+                @Override
+                void onFailure(Throwable error) {
+                    next.onFailure(error);
+                }
+
+                boolean authSent = false;
+
+                private Sasl processSaslEvent(Sasl sasl) {
+                    if (sasl.getOutcome() == Sasl.SaslOutcome.PN_SASL_OK) {
+                        next.onSuccess(null);
+                        return null;
+                    }
+                    HashSet<String> mechanisims = new 
HashSet<String>(Arrays.asList(sasl.getRemoteMechanisms()));
+                    if (!authSent && !mechanisims.isEmpty()) {
+                        if (mechanisims.contains("PLAIN")) {
+                            authSent = true;
+                            DataByteArrayOutputStream os = new 
DataByteArrayOutputStream();
+                            try {
+                                os.writeByte(0);
+                                os.write(new UTF8Buffer(options.getUser()));
+                                os.writeByte(0);
+                                if (options.getPassword() != null) {
+                                    os.write(new 
UTF8Buffer(options.getPassword()));
+                                }
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+                            Buffer buffer = os.toBuffer();
+                            sasl.setMechanisms(new String[]{"PLAIN"});
+                            sasl.send(buffer.data, buffer.offset, 
buffer.length);
+                        } else if (mechanisims.contains("ANONYMOUS")) {
+                            authSent = true;
+                            sasl.setMechanisms(new String[]{"ANONYMOUS"});
+                            sasl.send(new byte[0], 0, 0);
+                        } else {
+                            next.onFailure(Support.illegalState("Remote does 
not support plain password authentication."));
+                            return null;
+                        }
+                    }
+                    return sasl;
+                }
+            });
+        }
+    }
+
+    class SaslServerListener extends AmqpTransportListener {
+        Sasl sasl;
+
+        @Override
+        public void onTransportCommand(Object command) {
+            try {
+                if (command.getClass() == AmqpHeader.class) {
+                    AmqpHeader header = (AmqpHeader)command;
+                    switch( header.getProtocolId() ) {
+                        case 3: // Client will be using SASL for auth..
+                            if( listener!=null ) {
+                                sasl = 
listener.processSaslConnect(protonTransport);
+                                break;
+                            }
+                        default:
+                            AmqpTransportListener listener = new 
AmqpTransportListener();
+                            
hawtdispatchTransport.setTransportListener(listener);
+                            listener.onTransportCommand(command);
+                            return;
+                    }
+                    command = header.getBuffer();
+                }
+            } catch (Exception e) {
+                onFailure(e);
+            }
+            super.onTransportCommand(command);
+        }
+
+        @Override
+        void process() {
+            if (sasl != null) {
+                sasl = listener.processSaslEvent(sasl);
+            }
+            if (sasl == null) {
+                // once sasl handshake is done.. we need to read the protocol 
header again.
+                ((AmqpProtocolCodec) 
hawtdispatchTransport.getProtocolCodec()).readProtocolHeader();
+                hawtdispatchTransport.setTransportListener(new 
AmqpTransportListener());
+            }
+        }
+    }
+
+    static public AmqpTransport accept(Transport transport) {
+        return new 
AmqpTransport(transport.getDispatchQueue()).accepted(transport);
+    }
+
+    private AmqpTransport accepted(final Transport transport) {
+        state = CONNECTED;
+        bind(transport);
+        hawtdispatchTransport.setTransportListener(new SaslServerListener());
+        return this;
+    }
+
+    private void bind(final Transport transport) {
+        this.hawtdispatchTransport = transport;
+        this.protonTransport = (ProtonJTransport) 
org.apache.qpid.proton.engine.Transport.Factory.create();
+        this.protonTransport.bind(connection);
+        if( transport.getProtocolCodec()==null ) {
+            try {
+                transport.setProtocolCodec(new AmqpProtocolCodec());
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public void defer(Defer defer) {
+        if( !defer.defered ) {
+            defer.defered = true;
+            defers.merge(defer);
+        }
+    }
+
+    public void pumpOut() {
+        assertExecuting();
+        defer(deferedPumpOut);
+    }
+
+    private Defer deferedPumpOut = new Defer() {
+        public void run() {
+            doPumpOut();
+        }
+    };
+
+    private void doPumpOut() {
+        switch(state) {
+            case CONNECTING:
+            case CONNECTED:
+                break;
+            default:
+                return;
+        }
+
+        int size = 
hawtdispatchTransport.getProtocolCodec().getWriteBufferSize();
+        byte data[] = new byte[size];
+        boolean done = false;
+        int pumped = 0;
+        while( !done && !hawtdispatchTransport.full() ) {
+            int count = protonTransport.output(data, 0, size);
+            if( count > 0 ) {
+                pumped += count;
+                boolean accepted = hawtdispatchTransport.offer(new 
Buffer(data, 0, count));
+                assert accepted: "Should be accepted since the transport was 
not full";
+            } else {
+                done = true;
+            }
+        }
+        if( pumped > 0 && !hawtdispatchTransport.full() ) {
+            listener.processRefill();
+        }
+    }
+
+    public Sasl sasl;
+    public void fireListenerEvents() {
+        fireWatches();
+
+        if( sasl!=null ) {
+            sasl = listener.processSaslEvent(sasl);
+            if( sasl==null ) {
+                // once sasl handshake is done.. we need to read the protocol 
header again.
+                
((AmqpProtocolCodec)this.hawtdispatchTransport.getProtocolCodec()).readProtocolHeader();
+            }
+        }
+
+        context(connection).fireListenerEvents(listener);
+
+        Session session = connection.sessionHead(ALL_SET, ALL_SET);
+        while(session != null)
+        {
+            context(session).fireListenerEvents(listener);
+            session = session.next(ALL_SET, ALL_SET);
+        }
+
+        Link link = connection.linkHead(ALL_SET, ALL_SET);
+        while(link != null)
+        {
+            context(link).fireListenerEvents(listener);
+            link = link.next(ALL_SET, ALL_SET);
+        }
+
+        Delivery delivery = connection.getWorkHead();
+        while(delivery != null)
+        {
+            listener.processDelivery(delivery);
+            delivery = delivery.getWorkNext();
+        }
+
+        listener.processRefill();
+    }
+
+
+    public ProtonJConnection connection() {
+        return connection;
+    }
+
+    AmqpListener listener = new AmqpListener();
+    public AmqpListener getListener() {
+        return listener;
+    }
+
+    public void setListener(AmqpListener listener) {
+        this.listener = listener;
+    }
+
+    public EndpointContext context(Endpoint endpoint) {
+        EndpointContext context = (EndpointContext) endpoint.getContext();
+        if( context == null ) {
+            context = new EndpointContext(this, endpoint);
+            endpoint.setContext(context);
+        }
+        return context;
+    }
+
+    class AmqpTransportListener extends DefaultTransportListener {
+
+        @Override
+        public void onTransportConnected() {
+            if( listener!=null ) {
+                listener.processTransportConnected();
+            }
+        }
+
+        @Override
+        public void onRefill() {
+            if( listener!=null ) {
+                listener.processRefill();
+            }
+        }
+
+        @Override
+        public void onTransportCommand(Object command) {
+            if( state != CONNECTED ) {
+                return;
+            }
+            try {
+                Buffer buffer;
+                if (command.getClass() == AmqpHeader.class) {
+                    buffer = ((AmqpHeader) command).getBuffer();
+                } else {
+                    buffer = (Buffer) command;
+                }
+                ByteBuffer bbuffer = buffer.toByteBuffer();
+                do {
+                  ByteBuffer input = protonTransport.getInputBuffer();
+                  ByteBufferUtils.pour(bbuffer, input);
+                  protonTransport.processInput();
+                } while (bbuffer.remaining() > 0);
+                process();
+                pumpOut();
+            } catch (Exception e) {
+                onFailure(e);
+            }
+        }
+
+        void process() {
+            fireListenerEvents();
+        }
+
+        @Override
+        public void onTransportFailure(IOException error) {
+            if( state==CONNECTED ) {
+                failure = error;
+                if( listener!=null ) {
+                    listener.processTransportFailure(error);
+                    fireWatches();
+                }
+            }
+        }
+
+        void onFailure(Throwable error) {
+            failure = error;
+            if( listener!=null ) {
+                listener.processFailure(error);
+                fireWatches();
+            }
+        }
+    }
+
+    public void disconnect() {
+        assertExecuting();
+        if( state == CONNECTING || state==CONNECTED) {
+            state = DISCONNECTING;
+            if( hawtdispatchTransport!=null ) {
+                hawtdispatchTransport.stop(new Task(){
+                    public void run() {
+                        state = DISCONNECTED;
+                        hawtdispatchTransport = null;
+                        protonTransport = null;
+                        fireWatches();
+                    }
+                });
+            }
+        }
+    }
+
+    public DispatchQueue queue() {
+        return queue;
+    }
+
+    public void assertExecuting() {
+        queue().assertExecuting();
+    }
+
+    public void onTransportConnected(final Callback<Void> cb) {
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                if( failure !=null ) {
+                    cb.onFailure(failure);
+                    return true;
+                }
+                if( state!=CONNECTING ) {
+                    cb.onSuccess(null);
+                    return true;
+                }
+                return false;
+            }
+        });
+    }
+
+    public void onTransportDisconnected(final Callback<Void> cb) {
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                if( state==DISCONNECTED ) {
+                    cb.onSuccess(null);
+                    return true;
+                }
+                return false;
+            }
+        });
+    }
+
+    public void onTransportFailure(final Callback<Throwable> cb) {
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                if( failure!=null ) {
+                    cb.onSuccess(failure);
+                    return true;
+                }
+                return false;
+            }
+        });
+    }
+
+    public Throwable getFailure() {
+        return failure;
+    }
+
+    public void setProtocolTracer(ProtocolTracer protocolTracer) {
+        protonTransport.setProtocolTracer(protocolTracer);
+    }
+
+    public ProtocolTracer getProtocolTracer() {
+        return protonTransport.getProtocolTracer();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java
----------------------------------------------------------------------
diff --git 
a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java
 
b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java
new file mode 100644
index 0000000..eee8241
--- /dev/null
+++ 
b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.fusesource.hawtdispatch.Task;
+
+/**
+ * @author <a href="http://hiramchirino.com";>Hiram Chirino</a>
+ */
+abstract public class Defer extends Task {
+    boolean defered;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java
----------------------------------------------------------------------
diff --git 
a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java
 
b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java
new file mode 100644
index 0000000..c12a849
--- /dev/null
+++ 
b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.fusesource.hawtdispatch.Task;
+
+/**
+* @author <a href="http://hiramchirino.com";>Hiram Chirino</a>
+*/
+public class EndpointContext {
+
+    private final AmqpTransport transport;
+    private final Endpoint endpoint;
+    private Object attachment;
+    boolean listenerProcessing;
+
+    public EndpointContext(AmqpTransport transport, Endpoint endpoint) {
+        this.transport = transport;
+        this.endpoint = endpoint;
+    }
+
+    class ProcessedTask extends Task {
+        @Override
+        public void run() {
+            transport.assertExecuting();
+            listenerProcessing = false;
+            transport.pumpOut();
+        }
+    }
+
+    public void fireListenerEvents(AmqpListener listener) {
+        if( listener!=null && !listenerProcessing ) {
+            if( endpoint.getLocalState() == EndpointState.UNINITIALIZED &&
+                endpoint.getRemoteState() != EndpointState.UNINITIALIZED ) {
+                listenerProcessing = true;
+                listener.processRemoteOpen(endpoint, new ProcessedTask());
+            } else if( endpoint.getLocalState() == EndpointState.ACTIVE &&
+                endpoint.getRemoteState() == EndpointState.CLOSED ) {
+                listenerProcessing = true;
+                listener.processRemoteClose(endpoint, new ProcessedTask());
+            }
+        }
+        if( attachment !=null && attachment instanceof Task ) {
+            ((Task) attachment).run();
+        }
+    }
+
+    public Object getAttachment() {
+        return attachment;
+    }
+
+    public <T> T getAttachment(Class<T> clazz) {
+        return clazz.cast(getAttachment());
+    }
+
+    public void setAttachment(Object attachment) {
+        this.attachment = attachment;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java
----------------------------------------------------------------------
diff --git 
a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java
 
b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java
new file mode 100644
index 0000000..8d6f83b
--- /dev/null
+++ 
b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+/**
+ * @author <a href="http://hiramchirino.com";>Hiram Chirino</a>
+ */
+public class Support {
+
+    public static IllegalStateException illegalState(String msg) {
+        return (IllegalStateException) new 
IllegalStateException(msg).fillInStackTrace();
+    }
+
+    public static IllegalStateException createUnhandledEventError() {
+        return illegalState("Unhandled event.");
+    }
+
+    public static IllegalStateException createListenerNotSetError() {
+        return illegalState("No connection listener set to handle message 
received from the server.");
+    }
+
+    public static IllegalStateException createDisconnectedError() {
+        return illegalState("Disconnected");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java
----------------------------------------------------------------------
diff --git 
a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java
 
b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java
new file mode 100644
index 0000000..6bb7603
--- /dev/null
+++ 
b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+/**
+* @author <a href="http://hiramchirino.com";>Hiram Chirino</a>
+*/
+public abstract class Watch {
+    /* returns true if the watch has been triggered */
+    public abstract boolean execute();
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java
----------------------------------------------------------------------
diff --git 
a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java
 
b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java
new file mode 100644
index 0000000..a4b1591
--- /dev/null
+++ 
b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.Task;
+
+import java.util.LinkedList;
+
+/**
+ * @author <a href="http://hiramchirino.com";>Hiram Chirino</a>
+ */
+abstract public class WatchBase {
+
+    private LinkedList<Watch> watches = new LinkedList<Watch>();
+    protected void addWatch(final Watch task) {
+        watches.add(task);
+        fireWatches();
+    }
+
+    protected void fireWatches() {
+        if( !this.watches.isEmpty() ) {
+            Dispatch.getCurrentQueue().execute(new Task(){
+                @Override
+                public void run() {
+                    // Lets see if any of the watches are triggered.
+                    LinkedList<Watch> tmp = watches;
+                    watches = new LinkedList<Watch>();
+                    for (Watch task : tmp) {
+                        if( !task.execute() ) {
+                            watches.add(task);
+                        }
+                    }
+                }
+            });
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java
 
b/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java
new file mode 100644
index 0000000..7cbed14
--- /dev/null
+++ 
b/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java
@@ -0,0 +1,292 @@
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.net.URISyntaxException;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.hawtdispatch.test.MessengerServer;
+import org.apache.qpid.proton.message.Message;
+import org.fusesource.hawtdispatch.Task;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+/**
+ * Hello world!
+ *
+ */
+
+public class SampleTest {
+
+       private static final Logger _logger = 
Logger.getLogger(SampleTest.class.getName());
+
+       private MessengerServer server;
+
+       @Before
+       public void startServer() {
+               server = new MessengerServer();
+               server.start();
+       }
+
+       @After
+       public void stopServer() {
+               server.stop();
+       }
+
+       @Test
+       public void test() throws Exception {
+               int expected = 10;
+               final AtomicInteger countdown = new AtomicInteger(expected);
+               AmqpConnectOptions options = new AmqpConnectOptions();
+               final String container = UUID.randomUUID().toString();
+               try {
+                       options.setHost(server.getHost(), server.getPort());
+                       options.setLocalContainerId(container);
+                       options.setUser("anonymous");
+                       options.setPassword("changeit");
+               } catch (URISyntaxException e) {
+                       e.printStackTrace();
+               }
+               final AmqpConnection conn = AmqpConnection.connect(options );
+               _logger.fine("connection queue");
+               conn.queue().execute(new Task() {
+
+                       @Override
+                       public void run() {
+                               _logger.fine("connection running, setup 
callbacks");
+                               conn.onTransportFailure(new 
Callback<Throwable>() {
+
+                                       @Override
+                                       public void onSuccess(Throwable value) {
+                                               _logger.fine("transportFailure 
Success? " + str(value));
+                                               conn.close();
+                                       }
+
+                                       @Override
+                                       public void onFailure(Throwable value) {
+                                               _logger.fine("transportFailure 
Trouble! " + str(value));
+                                               conn.close();
+                                       }
+                               });
+
+                               conn.onConnected(new Callback<Void>() {
+
+                                       @Override
+                                       public void onSuccess(Void value) {
+                                               _logger.fine("on connect 
Success! in container " + container);
+                                               final AmqpSession session = 
conn.createSession();
+                                               Target rqtarget = new Target();
+                                               rqtarget.setAddress("rq-tgt");
+                                               final AmqpSender sender = 
session.createSender(rqtarget, QoS.AT_LEAST_ONCE, "request-yyy");
+                                               Source rqsource = new Source();
+                                               rqsource.setAddress("rs-src");
+                                               
sender.getEndpoint().setSource(rqsource);
+                                               Source rssource = new Source();
+                                               rssource.setAddress("rs-src");
+                                               final AmqpReceiver receiver = 
session.createReceiver(rssource , QoS.AT_LEAST_ONCE, 10, "response-yyy");
+                                               Target rstarget = new Target();
+                                               final String address = "rs-tgt";
+                                               rstarget.setAddress(address);
+                                               
receiver.getEndpoint().setTarget(rstarget);
+                                               sender.onRemoteClose(new 
Callback<ErrorCondition>() {
+
+                                                       @Override
+                                                       public void 
onSuccess(ErrorCondition value) {
+                                                               
_logger.fine("sender remote close!" + str(value));
+                                                       }
+
+                                                       @Override
+                                                       public void 
onFailure(Throwable value) {
+                                                               
_logger.fine("sender remote close Trouble!" + str(value));
+                                                               conn.close();
+
+                                                       }
+
+                                               });
+                                               receiver.onRemoteClose(new 
Callback<ErrorCondition>() {
+
+                                                       @Override
+                                                       public void 
onSuccess(ErrorCondition value) {
+                                                               
_logger.fine("receiver remote close!" + str(value));
+                                                       }
+
+                                                       @Override
+                                                       public void 
onFailure(Throwable value) {
+                                                               
_logger.fine("receiver remote close Trouble!" + str(value));
+                                                               conn.close();
+
+                                                       }
+
+                                               });
+
+                                               final Task work = new Task() {
+
+                                                       private AtomicInteger 
count = new AtomicInteger();
+
+                                                       @Override
+                                                       public void run() {
+                                                               Message message 
= session.createTextMessage("hello world! " + 
String.valueOf(count.incrementAndGet()));
+                                                               
message.setAddress("amqp://joze/rq-src");
+                                                               String reply_to 
= "amqp://" + container + "/" + address;
+                                                               
message.setReplyTo(reply_to);
+                                                               
message.setCorrelationId("correlator");
+                                                               final 
MessageDelivery md = sender.send(message);
+                                                               
md.onRemoteStateChange(new Callback<DeliveryState>() {
+
+                                                                       
@Override
+                                                                       public 
void onSuccess(DeliveryState value) {
+                                                                               
_logger.fine("delivery remote state change! " + str(value) +
+                                                                               
                " local: "+ str(md.getLocalState()) +
+                                                                               
                " remote: " + str(md.getRemoteState()));
+                                                                       }
+
+                                                                       
@Override
+                                                                       public 
void onFailure(Throwable value) {
+                                                                               
_logger.fine("remote state change Trouble!" + str(value));
+                                                                               
conn.close();
+                                                                       }
+
+                                                               });
+                                                               md.onSettle(new 
Callback<DeliveryState>() {
+
+                                                                       
@Override
+                                                                       public 
void onSuccess(DeliveryState value) {
+                                                                               
_logger.fine("delivery settled! " + str(value) +
+                                                                               
                " local: "+ str(md.getLocalState()) +
+                                                                               
                " remote: " + str(md.getRemoteState()));
+                                                                               
_logger.fine("sender settle mode state " +
+                                                                               
                " local receiver " + 
str(sender.getEndpoint().getReceiverSettleMode()) +
+                                                                               
                " local sender " + 
str(sender.getEndpoint().getSenderSettleMode()) +
+                                                                               
                " remote receiver " + 
str(sender.getEndpoint().getRemoteReceiverSettleMode()) +
+                                                                               
                " remote sender " + 
str(sender.getEndpoint().getRemoteSenderSettleMode()) +
+                                                                               
                ""
+                                                                               
                );
+                                                                       }
+
+                                                                       
@Override
+                                                                       public 
void onFailure(Throwable value) {
+                                                                               
_logger.fine("delivery sending Trouble!" + str(value));
+                                                                               
conn.close();
+                                                                       }
+                                                               });
+                                                       }
+
+                                               };
+                                               
receiver.setDeliveryListener(new AmqpDeliveryListener() {
+
+                                                       @Override
+                                                       public void 
onMessageDelivery(
+                                                                       
MessageDelivery delivery) {
+                                                               Message message 
= delivery.getMessage();
+                                                               
_logger.fine("incoming message delivery! " +
+                                                                               
" local " + str(delivery.getLocalState()) +
+                                                                               
" remote " + str(delivery.getRemoteState()) +
+                                                                               
" message " + str(message.getBody()) +
+                                                                               
"");
+                                                               
delivery.onSettle(new Callback<DeliveryState>() {
+
+                                                                       
@Override
+                                                                       public 
void onSuccess(DeliveryState value) {
+                                                                               
_logger.fine("incoming message settled! ");
+                                                                               
int i = countdown.decrementAndGet();
+                                                                               
if ( i > 0 ) {
+                                                                               
        _logger.fine("More work " + str(i));
+                                                                               
        work.run();
+                                                                               
} else {
+                                                                               
        conn.queue().executeAfter(100, TimeUnit.MILLISECONDS, new Task() {
+
+                                                                               
                @Override
+                                                                               
                public void run() {
+                                                                               
                        _logger.fine("stopping sender");
+                                                                               
                        sender.close();                                         
                                                
+                                                                               
                }
+                                                                               
        });
+                                                                               
        conn.queue().executeAfter(200, TimeUnit.MILLISECONDS, new Task() {
+
+                                                                               
                @Override
+                                                                               
                public void run() {
+                                                                               
                        _logger.fine("stopping receiver");
+                                                                               
                        receiver.close();
+
+                                                                               
                }
+                                                                               
        });
+                                                                               
        conn.queue().executeAfter(300, TimeUnit.MILLISECONDS, new Task() {
+
+                                                                               
                @Override
+                                                                               
                public void run() {
+                                                                               
                        _logger.fine("stopping session");
+                                                                               
                        session.close();
+
+                                                                               
                }
+                                                                               
        });
+                                                                               
        conn.queue().executeAfter(400, TimeUnit.MILLISECONDS, new Task() {
+
+                                                                               
                @Override
+                                                                               
                public void run() {
+                                                                               
                        _logger.fine("stopping connection");
+                                                                               
                        conn.close();
+
+                                                                               
                }
+                                                                               
        });
+                                                                               
}
+                                                                       }
+
+                                                                       
@Override
+                                                                       public 
void onFailure(Throwable value) {
+                                                                               
_logger.fine("trouble settling incoming message " + str(value));
+                                                                               
conn.close();
+                                                                       }
+                                                               });
+                                                               
delivery.settle();
+                                                       }
+
+                                               });
+
+                                               // start the receiver
+                                               receiver.resume();
+
+                                               // send first message
+                                               conn.queue().execute(work);
+                                       }
+
+                                       @Override
+                                       public void onFailure(Throwable value) {
+                                               _logger.fine("on connect 
Failure?" + str(value));
+                                               conn.close();
+                                       }
+                               });
+                               _logger.fine("connection setup done");
+
+
+                       }
+
+               });
+               try {
+                       _logger.fine("Waiting...");
+                       Future<Void> disconnectedFuture = 
conn.getDisconnectedFuture();
+                       disconnectedFuture.await(10, TimeUnit.SECONDS);
+                       _logger.fine("done");
+                       assertEquals(expected, server.getMessagesReceived());
+               } catch (Exception e) {
+                       _logger.log(Level.SEVERE, "Test failed, possibly due to 
timeout", e);
+                       throw e;
+               }
+       }
+
+       private String str(Object value) {
+               if (value == null)
+                       return "null";
+               return value.toString();
+       }
+
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/test/MessengerServer.java
----------------------------------------------------------------------
diff --git 
a/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/test/MessengerServer.java
 
b/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/test/MessengerServer.java
new file mode 100644
index 0000000..dae68b6
--- /dev/null
+++ 
b/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/test/MessengerServer.java
@@ -0,0 +1,135 @@
+package org.apache.qpid.proton.hawtdispatch.test;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.qpid.proton.InterruptException;
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.messenger.Messenger;
+import org.apache.qpid.proton.messenger.Tracker;
+
+public class MessengerServer {
+       public static final String REJECT_ME = "*REJECT-ME*";
+       private int timeout = 1000;
+       private String host = "127.0.0.1";
+       private int port = 55555;
+       private Messenger msgr;
+       private AtomicInteger messagesReceived = new AtomicInteger(0);
+       private AtomicInteger messagesSent = new AtomicInteger(0);
+       private AtomicBoolean serverShouldRun = new AtomicBoolean();
+       private AtomicReference<Throwable> issues = new 
AtomicReference<Throwable>();
+       private Thread thread;
+       private CountDownLatch serverStart;
+
+       public MessengerServer() {
+       }
+       public void start() {
+               if (!serverShouldRun.compareAndSet(false, true)) {
+                       throw new IllegalStateException("started twice");
+               }
+               msgr = Proton.messenger();
+               serverStart = new CountDownLatch(1);
+               thread = new Thread(new Runnable() {
+
+                       @Override
+                       public void run() {
+                               try {
+                                       msgr.start();
+                                       
msgr.subscribe("amqp://~"+host+":"+String.valueOf(port));
+                                       serverStart.countDown();
+                                       try {
+                                               while(serverShouldRun.get()) {
+                                                       msgr.recv(100);
+                                                       while (msgr.incoming() 
> 0) {
+                                                               Message msg = 
msgr.get();
+                                                               
messagesReceived.incrementAndGet();
+                                                               Tracker tracker 
= msgr.incomingTracker();
+                                                               if 
(REJECT_ME.equals(msg.getBody())) {
+                                                                       
msgr.reject(tracker , 0);
+                                                               } else {
+                                                                       
msgr.accept(tracker, 0);
+                                                               }
+                                                               String reply_to 
= msg.getReplyTo();
+                                                               if (reply_to != 
null) {
+                                                                       
msg.setAddress(reply_to);
+                                                                       
msgr.put(msg);
+                                                                       
msgr.settle(msgr.outgoingTracker(), 0);
+                                                               }
+                                                       }
+                                               }
+                                       } finally {
+                                               msgr.stop();
+                                       }
+                               } catch (InterruptException ex) {
+                                       // we're done
+                               } catch (Exception ex) {
+                                       issues.set(ex);
+                               }
+                       }
+
+               });
+               thread.setName("MessengerServer");
+               thread.setDaemon(true);
+               thread.start();
+               try {
+                       serverStart.await();
+               } catch (InterruptedException e) {
+                       msgr.interrupt();
+               }
+       }
+
+       public void stop() {
+               if (!serverShouldRun.compareAndSet(true, false)) {
+                       return;
+               }
+               if (serverStart.getCount() == 0)
+                       msgr.interrupt();
+               try {
+                       thread.join(timeout);
+               } catch (InterruptedException e) {
+                       // TODO Auto-generated catch block
+                       e.printStackTrace();
+               }
+               thread = null;
+               if (!msgr.stopped())
+                       msgr.stop();
+               Throwable throwable = issues.get();
+               if (throwable != null)
+                       throw new RuntimeException("Messenger server had 
problems", throwable);
+       }
+
+       public String getHost() {
+               return host;
+       }
+
+       public int getPort() {
+               return port;
+       }
+
+       public void setHost(String host) {
+               this.host = host;
+       }
+
+       public void setPort(int port) {
+               this.port = port;
+       }
+       public int getTimeout() {
+               return timeout;
+       }
+       public void setTimeout(int timeout) {
+               this.timeout = timeout;
+       }
+
+       public int getMessagesReceived() {
+               return messagesReceived.get();
+       }
+
+       public int getMessagesSent() {
+               return messagesSent.get();
+       }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-jms/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/proton-jms/pom.xml b/contrib/proton-jms/pom.xml
new file mode 100644
index 0000000..4515cbb
--- /dev/null
+++ b/contrib/proton-jms/pom.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+  <parent>
+    <groupId>org.apache.qpid</groupId>
+    <artifactId>proton-project</artifactId>
+    <version>1.0-SNAPSHOT</version>
+    <relativePath>../..</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>proton-jms</artifactId>
+  <name>proton-jms</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.qpid</groupId>
+      <artifactId>proton-j</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.geronimo.specs</groupId>
+      <artifactId>geronimo-jms_1.1_spec</artifactId>
+      <version>1.1.1</version>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+
+  <build> 
+  </build>
+  <scm>
+    <url>http://svn.apache.org/viewvc/qpid/proton/</url>
+  </scm>
+
+</project>

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeInboundTransformer.java
----------------------------------------------------------------------
diff --git 
a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeInboundTransformer.java
 
b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeInboundTransformer.java
new file mode 100644
index 0000000..4beb401
--- /dev/null
+++ 
b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeInboundTransformer.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.jms;
+
+import javax.jms.Message;
+
+/**
+* @author <a href="http://hiramchirino.com";>Hiram Chirino</a>
+*/
+public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer {
+
+
+    public AMQPNativeInboundTransformer(JMSVendor vendor) {
+        super(vendor);
+    }
+
+    @Override
+    public Message transform(EncodedMessage amqpMessage) throws Exception {
+        org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
+
+        Message rc = super.transform(amqpMessage);
+
+        populateMessage(rc, amqp);
+        return rc;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeOutboundTransformer.java
----------------------------------------------------------------------
diff --git 
a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeOutboundTransformer.java
 
b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeOutboundTransformer.java
new file mode 100644
index 0000000..66ff0b1
--- /dev/null
+++ 
b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeOutboundTransformer.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.jms;
+
+import org.apache.qpid.proton.codec.CompositeWritableBuffer;
+import org.apache.qpid.proton.codec.DroppingWritableBuffer;
+import org.apache.qpid.proton.codec.WritableBuffer;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageFormatException;
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.proton.message.ProtonJMessage;
+/**
+* @author <a href="http://hiramchirino.com";>Hiram Chirino</a>
+*/
+public class AMQPNativeOutboundTransformer extends OutboundTransformer {
+
+    public AMQPNativeOutboundTransformer(JMSVendor vendor) {
+        super(vendor);
+    }
+
+    @Override
+    public EncodedMessage transform(Message msg) throws Exception {
+        if( msg == null )
+            return null;
+        if( !(msg instanceof BytesMessage) )
+            return null;
+        try {
+            if( !msg.getBooleanProperty(prefixVendor + "NATIVE") ) {
+                return null;
+            }
+        } catch (MessageFormatException e) {
+            return null;
+        }
+        return transform(this, (BytesMessage) msg);
+    }
+
+    static EncodedMessage transform(OutboundTransformer options, BytesMessage 
msg) throws JMSException {
+        long messageFormat;
+        try {
+            messageFormat = msg.getLongProperty(options.prefixVendor + 
"MESSAGE_FORMAT");
+        } catch (MessageFormatException e) {
+            return null;
+        }
+        byte data[] = new byte[(int) msg.getBodyLength()];
+        int dataSize = data.length;
+        msg.readBytes(data);
+        msg.reset();
+
+        try {
+            int count = msg.getIntProperty("JMSXDeliveryCount");
+            if( count > 1 ) {
+
+                // decode...
+                ProtonJMessage amqp = (ProtonJMessage) 
org.apache.qpid.proton.message.Message.Factory.create();
+                int offset = 0;
+                int len = data.length;
+                while( len > 0 ) {
+                    final int decoded = amqp.decode(data, offset, len);
+                    assert decoded > 0: "Make progress decoding the message";
+                    offset += decoded;
+                    len -= decoded;
+                }
+
+                // Update the DeliveryCount header...
+                amqp.getHeader().setDeliveryCount(new UnsignedInteger(count));
+
+                // Re-encode...
+                ByteBuffer buffer = ByteBuffer.wrap(new byte[1024*4]);
+                final DroppingWritableBuffer overflow = new 
DroppingWritableBuffer();
+                int c = amqp.encode(new CompositeWritableBuffer(new 
WritableBuffer.ByteBufferWrapper(buffer), overflow));
+                if( overflow.position() > 0 ) {
+                    buffer = ByteBuffer.wrap(new 
byte[1024*4+overflow.position()]);
+                    c = amqp.encode(new 
WritableBuffer.ByteBufferWrapper(buffer));
+                }
+                data = buffer.array();
+                dataSize = c;
+            }
+        } catch (JMSException e) {
+        }
+
+        return new EncodedMessage(messageFormat, data, 0, dataSize);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPRawInboundTransformer.java
----------------------------------------------------------------------
diff --git 
a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPRawInboundTransformer.java
 
b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPRawInboundTransformer.java
new file mode 100644
index 0000000..9baabdf
--- /dev/null
+++ 
b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPRawInboundTransformer.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.jms;
+
+import javax.jms.BytesMessage;
+import javax.jms.Message;
+
+public class AMQPRawInboundTransformer extends InboundTransformer {
+
+    public AMQPRawInboundTransformer(JMSVendor vendor) {
+        super(vendor);
+    }
+
+    @Override
+    public Message transform(EncodedMessage amqpMessage) throws Exception {
+        BytesMessage rc = vendor.createBytesMessage();
+        rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), 
amqpMessage.getLength());
+
+        rc.setJMSDeliveryMode(defaultDeliveryMode);
+        rc.setJMSPriority(defaultPriority);
+
+        final long now = System.currentTimeMillis();
+        rc.setJMSTimestamp(now);
+        if( defaultTtl > 0 ) {
+            rc.setJMSExpiration(now + defaultTtl);
+        }
+
+        rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", 
amqpMessage.getMessageFormat());
+        rc.setBooleanProperty(prefixVendor + "NATIVE", true);
+
+        return rc;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AutoOutboundTransformer.java
----------------------------------------------------------------------
diff --git 
a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AutoOutboundTransformer.java
 
b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AutoOutboundTransformer.java
new file mode 100644
index 0000000..a72198b
--- /dev/null
+++ 
b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AutoOutboundTransformer.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.jms;
+
+import javax.jms.BytesMessage;
+import javax.jms.Message;
+
+/**
+* @author <a href="http://hiramchirino.com";>Hiram Chirino</a>
+*/
+public class AutoOutboundTransformer extends JMSMappingOutboundTransformer {
+
+    public AutoOutboundTransformer(JMSVendor vendor) {
+        super(vendor);
+    }
+
+    @Override
+    public EncodedMessage transform(Message msg) throws Exception {
+        if( msg == null )
+            return null;
+        if( msg.getBooleanProperty(prefixVendor + "NATIVE") ) {
+            if( msg instanceof BytesMessage ) {
+                return AMQPNativeOutboundTransformer.transform(this, 
(BytesMessage)msg);
+            } else {
+                return null;
+            }
+        } else {
+            return JMSMappingOutboundTransformer.transform(this, msg);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/EncodedMessage.java
----------------------------------------------------------------------
diff --git 
a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/EncodedMessage.java
 
b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/EncodedMessage.java
new file mode 100644
index 0000000..19602c9
--- /dev/null
+++ 
b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/EncodedMessage.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.jms;
+
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.amqp.Binary;
+
+/**
+ * @author <a href="http://hiramchirino.com";>Hiram Chirino</a>
+ */
+public class EncodedMessage
+{
+
+    private final Binary data;
+    final long messageFormat;
+
+    public EncodedMessage(long messageFormat, byte[] data, int offset, int 
length) {
+        this.data = new Binary(data, offset, length);
+        this.messageFormat = messageFormat;
+    }
+
+    public long getMessageFormat() {
+        return messageFormat;
+    }
+
+    public Message decode() throws Exception {
+        Message amqp = Message.Factory.create();
+
+        int offset = getArrayOffset();
+        int len = getLength();
+        while( len > 0 ) {
+            final int decoded = amqp.decode(getArray(), offset, len);
+            assert decoded > 0: "Make progress decoding the message";
+            offset += decoded;
+            len -= decoded;
+        }
+
+        return amqp;
+    }
+
+    public int getLength()
+    {
+        return data.getLength();
+    }
+
+    public int getArrayOffset()
+    {
+        return data.getArrayOffset();
+    }
+
+    public byte[] getArray()
+    {
+        return data.getArray();
+    }
+
+    @Override
+    public String toString()
+    {
+        return data.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/InboundTransformer.java
----------------------------------------------------------------------
diff --git 
a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/InboundTransformer.java
 
b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/InboundTransformer.java
new file mode 100644
index 0000000..0374e6a
--- /dev/null
+++ 
b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/InboundTransformer.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.jms;
+
+import org.apache.qpid.proton.amqp.*;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Footer;
+import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Properties;
+
+import javax.jms.*;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+* @author <a href="http://hiramchirino.com";>Hiram Chirino</a>
+*/
+public abstract class InboundTransformer {
+
+    JMSVendor vendor;
+
+    public static final String TRANSFORMER_NATIVE = "native";
+    public static final String TRANSFORMER_RAW = "raw";
+    public static final String TRANSFORMER_JMS = "jms";
+
+    String prefixVendor = "JMS_AMQP_";
+    String prefixDeliveryAnnotations = "DA_";
+    String prefixMessageAnnotations= "MA_";
+    String prefixFooter = "FT_";
+
+    int defaultDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE;
+    int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
+    long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
+
+    public InboundTransformer(JMSVendor vendor) {
+        this.vendor = vendor;
+    }
+
+    abstract public Message transform(EncodedMessage amqpMessage) throws 
Exception;
+
+    public int getDefaultDeliveryMode() {
+        return defaultDeliveryMode;
+    }
+
+    public void setDefaultDeliveryMode(int defaultDeliveryMode) {
+        this.defaultDeliveryMode = defaultDeliveryMode;
+    }
+
+    public int getDefaultPriority() {
+        return defaultPriority;
+    }
+
+    public void setDefaultPriority(int defaultPriority) {
+        this.defaultPriority = defaultPriority;
+    }
+
+    public long getDefaultTtl() {
+        return defaultTtl;
+    }
+
+    public void setDefaultTtl(long defaultTtl) {
+        this.defaultTtl = defaultTtl;
+    }
+
+    public String getPrefixVendor() {
+        return prefixVendor;
+    }
+
+    public void setPrefixVendor(String prefixVendor) {
+        this.prefixVendor = prefixVendor;
+    }
+
+    public JMSVendor getVendor() {
+        return vendor;
+    }
+
+    public void setVendor(JMSVendor vendor) {
+        this.vendor = vendor;
+    }
+
+    protected void populateMessage(Message jms, 
org.apache.qpid.proton.message.Message amqp) throws Exception {
+        Header header = amqp.getHeader();
+        if( header==null ) {
+            header = new Header();
+        }
+
+        if( header.getDurable()!=null ) {
+            jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? 
DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+        } else {
+            jms.setJMSDeliveryMode(defaultDeliveryMode);
+        }
+        if( header.getPriority()!=null ) {
+            jms.setJMSPriority(header.getPriority().intValue());
+        } else {
+            jms.setJMSPriority(defaultPriority);
+        }
+        if( header.getFirstAcquirer() !=null ) {
+            jms.setBooleanProperty(prefixVendor + "FirstAcquirer", 
header.getFirstAcquirer());
+        }
+        if( header.getDeliveryCount()!=null ) {
+            vendor.setJMSXDeliveryCount(jms, 
header.getDeliveryCount().longValue());
+        }
+
+        final DeliveryAnnotations da = amqp.getDeliveryAnnotations();
+        if( da!=null ) {
+            for (Map.Entry<?,?> entry : da.getValue().entrySet()) {
+                String key = entry.getKey().toString();
+                setProperty(jms, prefixVendor + prefixDeliveryAnnotations + 
key, entry.getValue());
+            }
+        }
+
+        Class<? extends Destination> toAttributes = Destination.class;
+        Class<? extends Destination> replyToAttributes = Destination.class;
+
+        final MessageAnnotations ma = amqp.getMessageAnnotations();
+        if( ma!=null ) {
+            for (Map.Entry<?,?> entry : ma.getValue().entrySet()) {
+                String key = entry.getKey().toString();
+                if( "x-opt-jms-type".equals(key.toString()) && 
entry.getValue() != null ) {
+                    jms.setJMSType(entry.getValue().toString());
+                } else if( "x-opt-to-type".equals(key.toString()) ) {
+                    toAttributes = 
toClassFromAttributes(entry.getValue().toString());
+                } else if( "x-opt-reply-type".equals(key.toString()) ) {
+                    replyToAttributes = 
toClassFromAttributes(entry.getValue().toString());
+                } else {
+                    setProperty(jms, prefixVendor + prefixMessageAnnotations + 
key, entry.getValue());
+                }
+            }
+        }
+
+        final ApplicationProperties ap = amqp.getApplicationProperties();
+        if( ap !=null ) {
+            for (Map.Entry entry : (Set<Map.Entry>)ap.getValue().entrySet()) {
+                String key = entry.getKey().toString();
+                if( "JMSXGroupID".equals(key) ) {
+                    vendor.setJMSXGroupID(jms, entry.getValue().toString());
+                } else if( "JMSXGroupSequence".equals(key) ) {
+                    vendor.setJMSXGroupSequence(jms, 
((Number)entry.getValue()).intValue());
+                } else if( "JMSXUserID".equals(key) ) {
+                    vendor.setJMSXUserID(jms, entry.getValue().toString());
+                } else {
+                    setProperty(jms, key, entry.getValue());
+                }
+            }
+        }
+
+        final Properties properties = amqp.getProperties();
+        if( properties!=null ) {
+            if( properties.getMessageId()!=null ) {
+                jms.setJMSMessageID(properties.getMessageId().toString());
+            }
+            Binary userId = properties.getUserId();
+            if( userId!=null ) {
+                vendor.setJMSXUserID(jms, new String(userId.getArray(), 
userId.getArrayOffset(), userId.getLength(), "UTF-8"));
+            }
+            if( properties.getTo()!=null ) {
+                
jms.setJMSDestination(vendor.createDestination(properties.getTo(), 
toAttributes));
+            }
+            if( properties.getSubject()!=null ) {
+                jms.setStringProperty(prefixVendor + "Subject", 
properties.getSubject());
+            }
+            if( properties.getReplyTo() !=null ) {
+                
jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo(), 
replyToAttributes));
+            }
+            if( properties.getCorrelationId() !=null ) {
+                
jms.setJMSCorrelationID(properties.getCorrelationId().toString());
+            }
+            if( properties.getContentType() !=null ) {
+                jms.setStringProperty(prefixVendor + "ContentType", 
properties.getContentType().toString());
+            }
+            if( properties.getContentEncoding() !=null ) {
+                jms.setStringProperty(prefixVendor + "ContentEncoding", 
properties.getContentEncoding().toString());
+            }
+            if( properties.getCreationTime()!=null ) {
+                jms.setJMSTimestamp(properties.getCreationTime().getTime());
+            }
+            if( properties.getGroupId()!=null ) {
+                vendor.setJMSXGroupID(jms, properties.getGroupId());
+            }
+            if( properties.getGroupSequence()!=null ) {
+                vendor.setJMSXGroupSequence(jms, 
properties.getGroupSequence().intValue());
+            }
+            if( properties.getReplyToGroupId()!=null ) {
+                jms.setStringProperty(prefixVendor + "ReplyToGroupID", 
properties.getReplyToGroupId());
+            }
+            if( properties.getAbsoluteExpiryTime()!=null ) {
+                
jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime());
+            }
+        }
+
+        // If the jms expiration has not yet been set...
+        if( jms.getJMSExpiration()==0 ) {
+            // Then lets try to set it based on the message ttl.
+            long ttl = defaultTtl;
+            if( header.getTtl()!=null ) {
+                ttl = header.getTtl().longValue();
+            }
+            if( ttl == 0 ) {
+              jms.setJMSExpiration(0);
+            } else {
+                jms.setJMSExpiration(System.currentTimeMillis()+ttl);
+            }
+        }
+
+        final Footer fp = amqp.getFooter();
+        if( fp !=null ) {
+            for (Map.Entry entry : (Set<Map.Entry>)fp.getValue().entrySet()) {
+                String key = entry.getKey().toString();
+                setProperty(jms, prefixVendor + prefixFooter + key, 
entry.getValue());
+            }
+        }
+    }
+
+    private static final Set<String> QUEUE_ATTRIBUTES = createSet("queue");
+    private static final Set<String> TOPIC_ATTRIBUTES = createSet("topic");
+    private static final Set<String> TEMP_QUEUE_ATTRIBUTES = 
createSet("queue", "temporary");
+    private static final Set<String> TEMP_TOPIC_ATTRIBUTES = 
createSet("topic", "temporary");
+
+    private static Set<String> createSet(String ... args) {
+        HashSet<String> s = new HashSet<String>();
+        for (String arg : args)
+        {
+            s.add(arg);
+        }
+        return Collections.unmodifiableSet(s);
+    }
+
+    Class<? extends Destination> toClassFromAttributes(String value)
+    {
+        if( value ==null ) {
+            return null;
+        }
+        HashSet<String> attributes = new HashSet<String>();
+        for( String x: value.split("\\s*,\\s*") ) {
+            attributes.add(x);
+        }
+
+        if( QUEUE_ATTRIBUTES.equals(attributes) ) {
+            return Queue.class;
+        }
+        if( TOPIC_ATTRIBUTES.equals(attributes) ) {
+            return Topic.class;
+        }
+        if( TEMP_QUEUE_ATTRIBUTES.equals(attributes) ) {
+            return TemporaryQueue.class;
+        }
+        if( TEMP_TOPIC_ATTRIBUTES.equals(attributes) ) {
+            return TemporaryTopic.class;
+        }
+        return Destination.class;
+    }
+
+
+    private void setProperty(Message msg, String key, Object value) throws 
JMSException {
+        if( value instanceof UnsignedLong) {
+            long v = ((UnsignedLong) value).longValue();
+            msg.setLongProperty(key, v);
+        } else if( value instanceof UnsignedInteger) {
+            long v = ((UnsignedInteger) value).longValue();
+            if( Integer.MIN_VALUE <= v && v <= Integer.MAX_VALUE ) {
+                msg.setIntProperty(key, (int) v);
+            } else {
+                msg.setLongProperty(key, v);
+            }
+        } else if( value instanceof UnsignedShort) {
+            int v = ((UnsignedShort) value).intValue();
+            if( Short.MIN_VALUE <= v && v <= Short.MAX_VALUE ) {
+                msg.setShortProperty(key, (short) v);
+            } else {
+                msg.setIntProperty(key, v);
+            }
+        } else if( value instanceof UnsignedByte) {
+            short v = ((UnsignedByte) value).shortValue();
+            if( Byte.MIN_VALUE <= v && v <= Byte.MAX_VALUE ) {
+                msg.setByteProperty(key, (byte) v);
+            } else {
+                msg.setShortProperty(key, v);
+            }
+        } else if( value instanceof Symbol) {
+            msg.setStringProperty(key, value.toString());
+        } else if( value instanceof Decimal128 ) {
+            msg.setDoubleProperty(key, ((Decimal128)value).doubleValue());
+        } else if( value instanceof Decimal64 ) {
+            msg.setDoubleProperty(key, ((Decimal64)value).doubleValue());
+        } else if( value instanceof Decimal32 ) {
+            msg.setFloatProperty(key, ((Decimal32)value).floatValue());
+        } else if( value instanceof Binary ) {
+            msg.setStringProperty(key, value.toString());
+        } else {
+            msg.setObjectProperty(key, value);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to