Work-in-progress.
Changed ConnectionImpl, TransportSession and duplicated SaslImpl, 
TransportFrame, FrameWriter and handler to support having the new and old 
transport sit side by side until we switch permanently.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/cf29c4ae
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/cf29c4ae
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/cf29c4ae

Branch: refs/heads/rajith-codec
Commit: cf29c4aed9620415b1b3b20f777a83d52fd475ae
Parents: 2cadd86
Author: Rajith Attapattu <[email protected]>
Authored: Fri May 15 09:20:31 2015 -0400
Committer: Rajith Attapattu <[email protected]>
Committed: Fri May 15 09:20:31 2015 -0400

----------------------------------------------------------------------
 .../qpid/proton/engine/impl/ConnectionImpl.java |   11 +-
 .../qpid/proton/engine/impl/EndpointImpl.java   |    3 +-
 .../qpid/proton/engine/impl/FrameHandler2.java  |   45 +
 .../qpid/proton/engine/impl/FrameWriter2.java   |   17 +-
 .../qpid/proton/engine/impl/SaslImpl2.java      |  707 ++++++++
 .../qpid/proton/engine/impl/TransportImpl2.java | 1670 ++++++++++++++++++
 .../proton/engine/impl/TransportSession.java    |    5 +-
 .../qpid/proton/framing/TransportFrame2.java    |   63 +
 8 files changed, 2511 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cf29c4ae/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
index 17ffde7..96e8891 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
@@ -49,7 +49,7 @@ public class ConnectionImpl extends EndpointImpl implements 
ProtonJConnection
     private DeliveryImpl _workHead;
     private DeliveryImpl _workTail;
 
-    private TransportImpl _transport;
+    private Transport _transport;
     private DeliveryImpl _transportWorkHead;
     private DeliveryImpl _transportWorkTail;
     private int _transportWorkSize = 0;
@@ -225,6 +225,11 @@ public class ConnectionImpl extends EndpointImpl 
implements ProtonJConnection
         put(Event.Type.CONNECTION_REMOTE_OPEN, this);
     }
 
+    //Added by Rajith as temp hack in support of TransportFrame2
+    void addConnectionOpenEvent()
+    {
+        put(Event.Type.CONNECTION_REMOTE_OPEN, this);
+    }
 
     EndpointImpl getTransportHead()
     {
@@ -466,12 +471,12 @@ public class ConnectionImpl extends EndpointImpl 
implements ProtonJConnection
         return new WorkSequence(_workHead);
     }
 
-    void setTransport(TransportImpl transport)
+    void setTransport(Transport transport)
     {
         _transport = transport;
     }
 
-    public TransportImpl getTransport()
+    public Transport getTransport()
     {
         return _transport;
     }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cf29c4ae/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java
index b97793a..5822673 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java
@@ -25,6 +25,7 @@ import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.Event;
 import org.apache.qpid.proton.engine.ProtonJEndpoint;
+import org.apache.qpid.proton.engine.Transport;
 
 public abstract class EndpointImpl implements ProtonJEndpoint
 {
@@ -149,7 +150,7 @@ public abstract class EndpointImpl implements 
ProtonJEndpoint
 
         if (emit) {
             ConnectionImpl conn = getConnectionImpl();
-            TransportImpl trans = conn.getTransport();
+            Transport trans = conn.getTransport();
             if (trans != null) {
                 conn.put(Event.Type.TRANSPORT, trans);
             }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cf29c4ae/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameHandler2.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameHandler2.java 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameHandler2.java
new file mode 100644
index 0000000..ba018f2
--- /dev/null
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameHandler2.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.engine.impl;
+
+import org.apache.qpid.proton.engine.TransportException;
+import org.apache.qpid.proton.framing.TransportFrame;
+import org.apache.qpid.proton.framing.TransportFrame2;
+
+public interface FrameHandler2
+{
+    /**
+     * @throws IllegalStateException
+     *             if I am not currently accepting input
+     * @see #isHandlingFrames()
+     * @returns false on end of stream
+     */
+    boolean handleFrame(TransportFrame2 frame);
+
+    void closed(TransportException error);
+
+    /**
+     * Returns whether I am currently able to handle frames. MUST be checked
+     * before calling {@link #handleFrame(TransportFrame)}.
+     */
+    boolean isHandlingFrames();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cf29c4ae/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter2.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter2.java 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter2.java
index 109fecf..9d1b432 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter2.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter2.java
@@ -31,6 +31,7 @@ import org.apache.qpid.proton.codec2.ByteArrayEncoder;
 import org.apache.qpid.proton.codec2.CodecHelper;
 import org.apache.qpid.proton.codec2.Type;
 import org.apache.qpid.proton.framing.TransportFrame;
+import org.apache.qpid.proton.transport2.Performative;
 
 /**
  * FrameWriter2
@@ -58,7 +59,7 @@ class FrameWriter2
 
     final private Ref<ProtocolTracer> _protocolTracer;
 
-    private TransportImpl _transport;
+    private TransportImpl2 _transport;
 
     private int _frameStart = 0;
 
@@ -70,8 +71,10 @@ class FrameWriter2
 
     private int _read = 0;
 
+    private long _framesOutput = 0;
+
     FrameWriter2(ByteArrayEncoder encoder, int maxFrameSize, byte frameType, 
Ref<ProtocolTracer> protocolTracer,
-            TransportImpl transport)
+            TransportImpl2 transport)
     {
         _encoder = encoder;
         _encoder.init(_buffer, 0, _buffer.length);
@@ -143,7 +146,7 @@ class FrameWriter2
         _encoder.putShort((short) channel);
     }
 
-    void writeFrame(int channel, Object frameBody, ByteBuffer payload, 
Runnable onPayloadTooLarge)
+    void writeFrame(int channel, Performative frameBody, ByteBuffer payload, 
Runnable onPayloadTooLarge)
     {
         startFrame();
 
@@ -200,9 +203,10 @@ class FrameWriter2
             _position = _position + payloadSize + 1;
         }
         endFrame(channel);
+        _framesOutput += 1;
     }
 
-    void writeFrame(Object frameBody)
+    void writeFrame(Performative frameBody)
     {
         writeFrame(0, frameBody, null, null);
     }
@@ -232,4 +236,9 @@ class FrameWriter2
 
         return size;
     }
+
+    long getFramesOutput()
+    {
+        return _framesOutput;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cf29c4ae/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl2.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl2.java 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl2.java
new file mode 100644
index 0000000..3389789
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl2.java
@@ -0,0 +1,707 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.qpid.proton.engine.impl;
+
+import static 
org.apache.qpid.proton.engine.impl.ByteBufferUtils.newWriteableBuffer;
+import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.pourAll;
+import static 
org.apache.qpid.proton.engine.impl.ByteBufferUtils.pourBufferToArray;
+
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.qpid.proton.ProtonUnsupportedOperationException;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.security.SaslChallenge;
+import org.apache.qpid.proton.amqp.security.SaslCode;
+import org.apache.qpid.proton.amqp.security.SaslFrameBody;
+import org.apache.qpid.proton.amqp.security.SaslInit;
+import org.apache.qpid.proton.amqp.security.SaslMechanisms;
+import org.apache.qpid.proton.amqp.security.SaslResponse;
+import org.apache.qpid.proton.codec.AMQPDefinedTypes;
+import org.apache.qpid.proton.codec.DecoderImpl;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Transport;
+import org.apache.qpid.proton.engine.TransportException;
+
+public class SaslImpl2 implements Sasl, 
SaslFrameBody.SaslFrameBodyHandler<Void>, SaslFrameHandler
+{
+    private static final Logger _logger = 
Logger.getLogger(SaslImpl2.class.getName());
+
+    public static final byte SASL_FRAME_TYPE = (byte) 1;
+
+    private final DecoderImpl _decoder = new DecoderImpl();
+    private final EncoderImpl _encoder = new EncoderImpl(_decoder);
+
+    private final TransportImpl2 _transport;
+
+    private boolean _tail_closed = false;
+    private final ByteBuffer _inputBuffer;
+    private boolean _head_closed = false;
+    private final ByteBuffer _outputBuffer;
+    private final FrameWriter _frameWriter;
+
+    private ByteBuffer _pending;
+
+    private boolean _headerWritten;
+    private Binary _challengeResponse;
+    private SaslFrameParser _frameParser;
+    private boolean _initReceived;
+    private boolean _mechanismsSent;
+    private boolean _initSent;
+
+    enum Role { CLIENT, SERVER };
+
+    private SaslOutcome _outcome = SaslOutcome.PN_SASL_NONE;
+    private SaslState _state = SaslState.PN_SASL_IDLE;
+
+    private String _hostname;
+    private boolean _done;
+    private Symbol[] _mechanisms;
+
+    private Symbol _chosenMechanism;
+
+    private Role _role;
+
+    /**
+     * @param maxFrameSize the size of the input and output buffers
+     * returned by {@link SaslTransportWrapper#getInputBuffer()} and
+     * {@link SaslTransportWrapper#getOutputBuffer()}.
+     */
+    SaslImpl2(TransportImpl2 transport, int maxFrameSize)
+    {
+        _transport = transport;
+        _inputBuffer = newWriteableBuffer(maxFrameSize);
+        _outputBuffer = newWriteableBuffer(maxFrameSize);
+
+        AMQPDefinedTypes.registerAllTypes(_decoder,_encoder);
+        _frameParser = new SaslFrameParser(this, _decoder);
+        _frameWriter = new FrameWriter(_encoder, maxFrameSize, 
FrameWriter.SASL_FRAME_TYPE, null, _transport);
+    }
+
+    void fail() {
+        if (_role == null || _role == Role.CLIENT) {
+            _role = Role.CLIENT;
+            _initSent = true;
+        } else {
+            _initReceived = true;
+
+        }
+        _done = true;
+        _outcome = SaslOutcome.PN_SASL_SYS;
+    }
+
+    @Override
+    public boolean isDone()
+    {
+        return _done && (_role==Role.CLIENT || _initReceived);
+    }
+
+    private void writeSaslOutput()
+    {
+        process();
+        _frameWriter.readBytes(_outputBuffer);
+
+        if(_logger.isLoggable(Level.FINER))
+        {
+            _logger.log(Level.FINER, "Finished writing SASL output. Output 
Buffer : " + _outputBuffer);
+        }
+    }
+
+    private void process()
+    {
+        processHeader();
+
+        if(_role == Role.SERVER)
+        {
+            if(!_mechanismsSent && _mechanisms != null)
+            {
+                SaslMechanisms mechanisms = new SaslMechanisms();
+
+                mechanisms.setSaslServerMechanisms(_mechanisms);
+                writeFrame(mechanisms);
+                _mechanismsSent = true;
+                _state = SaslState.PN_SASL_STEP;
+            }
+
+            if(getState() == SaslState.PN_SASL_STEP && getChallengeResponse() 
!= null)
+            {
+                SaslChallenge challenge = new SaslChallenge();
+                challenge.setChallenge(getChallengeResponse());
+                writeFrame(challenge);
+                setChallengeResponse(null);
+            }
+
+            if(_done)
+            {
+                org.apache.qpid.proton.amqp.security.SaslOutcome outcome =
+                        new org.apache.qpid.proton.amqp.security.SaslOutcome();
+                outcome.setCode(SaslCode.values()[_outcome.getCode()]);
+                writeFrame(outcome);
+            }
+        }
+        else if(_role == Role.CLIENT)
+        {
+            if(getState() == SaslState.PN_SASL_IDLE && _chosenMechanism != 
null)
+            {
+                processInit();
+                _state = SaslState.PN_SASL_STEP;
+
+                //HACK: if we received an outcome before
+                //we sent our init, change the state now
+                if(_outcome != SaslOutcome.PN_SASL_NONE)
+                {
+                    _state = classifyStateFromOutcome(_outcome);
+                }
+            }
+
+            if(getState() == SaslState.PN_SASL_STEP && getChallengeResponse() 
!= null)
+            {
+                processResponse();
+            }
+        }
+    }
+
+    private void writeFrame(SaslFrameBody frameBody)
+    {
+        _frameWriter.writeFrame(frameBody);
+    }
+
+    @Override
+    final public int recv(byte[] bytes, int offset, int size)
+    {
+        if(_pending == null)
+        {
+            return -1;
+        }
+        final int written = pourBufferToArray(_pending, bytes, offset, size);
+        if(!_pending.hasRemaining())
+        {
+            _pending = null;
+        }
+        return written;
+    }
+
+    @Override
+    final public int send(byte[] bytes, int offset, int size)
+    {
+        byte[] data = new byte[size];
+        System.arraycopy(bytes, offset, data, 0, size);
+        setChallengeResponse(new Binary(data));
+        return size;
+    }
+
+    final int processHeader()
+    {
+        if(!_headerWritten)
+        {
+            _frameWriter.writeHeader(AmqpHeader.SASL_HEADER);
+
+            _headerWritten = true;
+            return AmqpHeader.SASL_HEADER.length;
+        }
+        else
+        {
+            return 0;
+        }
+    }
+
+    @Override
+    public int pending()
+    {
+        return _pending == null ? 0 : _pending.remaining();
+    }
+
+    void setPending(ByteBuffer pending)
+    {
+        _pending = pending;
+    }
+
+    @Override
+    public SaslState getState()
+    {
+        return _state;
+    }
+
+    final Binary getChallengeResponse()
+    {
+        return _challengeResponse;
+    }
+
+    final void setChallengeResponse(Binary challengeResponse)
+    {
+        _challengeResponse = challengeResponse;
+    }
+
+    @Override
+    public void setMechanisms(String... mechanisms)
+    {
+        if(mechanisms != null)
+        {
+            _mechanisms = new Symbol[mechanisms.length];
+            for(int i = 0; i < mechanisms.length; i++)
+            {
+                _mechanisms[i] = Symbol.valueOf(mechanisms[i]);
+            }
+        }
+
+        if(_role == Role.CLIENT)
+        {
+            assert mechanisms != null;
+            assert mechanisms.length == 1;
+
+            _chosenMechanism = Symbol.valueOf(mechanisms[0]);
+        }
+    }
+
+    @Override
+    public String[] getRemoteMechanisms()
+    {
+        if(_role == Role.SERVER)
+        {
+            return _chosenMechanism == null ? new String[0] : new String[] { 
_chosenMechanism.toString() };
+        }
+        else if(_role == Role.CLIENT)
+        {
+            if(_mechanisms == null)
+            {
+                return new String[0];
+            }
+            else
+            {
+                String[] remoteMechanisms = new String[_mechanisms.length];
+                for(int i = 0; i < _mechanisms.length; i++)
+                {
+                    remoteMechanisms[i] = _mechanisms[i].toString();
+                }
+                return remoteMechanisms;
+            }
+        }
+        else
+        {
+            throw new IllegalStateException();
+        }
+    }
+
+    public void setMechanism(Symbol mechanism)
+    {
+        _chosenMechanism = mechanism;
+    }
+
+    public Symbol getChosenMechanism()
+    {
+        return _chosenMechanism;
+    }
+
+    public void setResponse(Binary initialResponse)
+    {
+        setPending(initialResponse.asByteBuffer());
+    }
+
+    @Override
+    public void handle(SaslFrameBody frameBody, Binary payload)
+    {
+        frameBody.invoke(this, payload, null);
+    }
+
+    @Override
+    public void handleInit(SaslInit saslInit, Binary payload, Void context)
+    {
+        if(_role == null)
+        {
+            server();
+        }
+        checkRole(Role.SERVER);
+        _hostname = saslInit.getHostname();
+        _chosenMechanism = saslInit.getMechanism();
+        _initReceived = true;
+        if(saslInit.getInitialResponse() != null)
+        {
+            setPending(saslInit.getInitialResponse().asByteBuffer());
+        }
+    }
+
+    @Override
+    public void handleResponse(SaslResponse saslResponse, Binary payload, Void 
context)
+    {
+        checkRole(Role.SERVER);
+        setPending(saslResponse.getResponse()  == null ? null : 
saslResponse.getResponse().asByteBuffer());
+    }
+
+    @Override
+    public void done(SaslOutcome outcome)
+    {
+        checkRole(Role.SERVER);
+        _outcome = outcome;
+        _done = true;
+        _state = classifyStateFromOutcome(outcome);
+        _logger.fine("SASL negotiation done: " + this);
+    }
+
+    private void checkRole(Role role)
+    {
+        if(role != _role)
+        {
+            throw new IllegalStateException("Role is " + _role + " but should 
be " + role);
+        }
+    }
+
+    @Override
+    public void handleMechanisms(SaslMechanisms saslMechanisms, Binary 
payload, Void context)
+    {
+        if(_role == null)
+        {
+            client();
+        }
+        checkRole(Role.CLIENT);
+        _mechanisms = saslMechanisms.getSaslServerMechanisms();
+    }
+
+    @Override
+    public void handleChallenge(SaslChallenge saslChallenge, Binary payload, 
Void context)
+    {
+        checkRole(Role.CLIENT);
+        setPending(saslChallenge.getChallenge()  == null ? null : 
saslChallenge.getChallenge().asByteBuffer());
+    }
+
+    @Override
+    public void handleOutcome(org.apache.qpid.proton.amqp.security.SaslOutcome 
saslOutcome,
+                              Binary payload,
+                              Void context)
+    {
+        checkRole(Role.CLIENT);
+        for(SaslOutcome outcome : SaslOutcome.values())
+        {
+            if(outcome.getCode() == saslOutcome.getCode().ordinal())
+            {
+                _outcome = outcome;
+                if (_state != SaslState.PN_SASL_IDLE)
+                {
+                    _state = classifyStateFromOutcome(outcome);
+                }
+                break;
+            }
+        }
+        _done = true;
+
+        if(_logger.isLoggable(Level.FINE))
+        {
+            _logger.fine("Handled outcome: " + this);
+        }
+    }
+
+    private SaslState classifyStateFromOutcome(SaslOutcome outcome)
+    {
+        return outcome == SaslOutcome.PN_SASL_OK ? SaslState.PN_SASL_PASS : 
SaslState.PN_SASL_FAIL;
+    }
+
+    private void processResponse()
+    {
+        SaslResponse response = new SaslResponse();
+        response.setResponse(getChallengeResponse());
+        setChallengeResponse(null);
+        writeFrame(response);
+    }
+
+    private void processInit()
+    {
+        SaslInit init = new SaslInit();
+        init.setHostname(_hostname);
+        init.setMechanism(_chosenMechanism);
+        if(getChallengeResponse() != null)
+        {
+            init.setInitialResponse(getChallengeResponse());
+            setChallengeResponse(null);
+        }
+        _initSent = true;
+        writeFrame(init);
+    }
+
+    @Override
+    public void plain(String username, String password)
+    {
+        client();
+        _chosenMechanism = Symbol.valueOf("PLAIN");
+        byte[] usernameBytes = username.getBytes();
+        byte[] passwordBytes = password.getBytes();
+        byte[] data = new byte[usernameBytes.length+passwordBytes.length+2];
+        System.arraycopy(usernameBytes, 0, data, 1, usernameBytes.length);
+        System.arraycopy(passwordBytes, 0, data, 2+usernameBytes.length, 
passwordBytes.length);
+
+        setChallengeResponse(new Binary(data));
+    }
+
+    @Override
+    public SaslOutcome getOutcome()
+    {
+        return _outcome;
+    }
+
+    @Override
+    public void client()
+    {
+        _role = Role.CLIENT;
+        if(_mechanisms != null)
+        {
+            assert _mechanisms.length == 1;
+
+            _chosenMechanism = _mechanisms[0];
+        }
+    }
+
+    @Override
+    public void server()
+    {
+        _role = Role.SERVER;
+    }
+
+    @Override
+    public void allowSkip(boolean allowSkip)
+    {
+        //TODO: implement
+        throw new ProtonUnsupportedOperationException();
+    }
+
+    public TransportWrapper wrap(final TransportInput input, final 
TransportOutput output)
+    {
+        return new SaslTransportWrapper(input, output);
+    }
+
+    @Override
+    public String toString()
+    {
+        StringBuilder builder = new StringBuilder();
+        builder
+            .append("SaslImpl [_outcome=").append(_outcome)
+            .append(", state=").append(_state)
+            .append(", done=").append(_done)
+            .append(", role=").append(_role)
+            .append("]");
+        return builder.toString();
+    }
+
+    private class SaslTransportWrapper implements TransportWrapper
+    {
+        private final TransportInput _underlyingInput;
+        private final TransportOutput _underlyingOutput;
+        private boolean _outputComplete;
+        private final ByteBuffer _head;
+
+        private SaslTransportWrapper(TransportInput input, TransportOutput 
output)
+        {
+            _underlyingInput = input;
+            _underlyingOutput = output;
+            _head = _outputBuffer.asReadOnlyBuffer();
+            _head.limit(0);
+        }
+
+        private void fillOutputBuffer()
+        {
+            if(isOutputInSaslMode())
+            {
+                SaslImpl2.this.writeSaslOutput();
+                if(_done)
+                {
+                    _outputComplete = true;
+                }
+            }
+        }
+
+        /**
+         * TODO rationalise this method with respect to the other similar 
checks of _role/_initReceived etc
+         * @see SaslImpl2#isDone()
+         */
+        private boolean isInputInSaslMode()
+        {
+            return _role == null || (_role == Role.CLIENT && !_done) ||(_role 
== Role.SERVER && (!_initReceived || !_done));
+        }
+
+        private boolean isOutputInSaslMode()
+        {
+            return _role == null || (_role == Role.CLIENT && (!_done || 
!_initSent)) || (_role == Role.SERVER && !_outputComplete);
+        }
+
+        @Override
+        public int capacity()
+        {
+            if (_tail_closed) return Transport.END_OF_STREAM;
+            if (isInputInSaslMode())
+            {
+                return _inputBuffer.remaining();
+            }
+            else
+            {
+                return _underlyingInput.capacity();
+            }
+        }
+
+        @Override
+        public int position()
+        {
+            if (_tail_closed) return Transport.END_OF_STREAM;
+            if (isInputInSaslMode())
+            {
+                return _inputBuffer.position();
+            }
+            else
+            {
+                return _underlyingInput.position();
+            }
+        }
+
+        @Override
+        public ByteBuffer tail()
+        {
+            if (!isInputInSaslMode())
+            {
+                return _underlyingInput.tail();
+            }
+
+            return _inputBuffer;
+        }
+
+        @Override
+        public void process() throws TransportException
+        {
+            _inputBuffer.flip();
+
+            try
+            {
+                reallyProcessInput();
+            }
+            finally
+            {
+                _inputBuffer.compact();
+            }
+        }
+
+        @Override
+        public void close_tail()
+        {
+            _tail_closed = true;
+            if (isInputInSaslMode()) {
+                _head_closed = true;
+                _underlyingInput.close_tail();
+            } else {
+                _underlyingInput.close_tail();
+            }
+        }
+
+        private void reallyProcessInput() throws TransportException
+        {
+            if(isInputInSaslMode())
+            {
+                if(_logger.isLoggable(Level.FINER))
+                {
+                    _logger.log(Level.FINER, SaslImpl2.this + " about to call 
input.");
+                }
+
+                _frameParser.input(_inputBuffer);
+            }
+
+            if(!isInputInSaslMode())
+            {
+                if(_logger.isLoggable(Level.FINER))
+                {
+                    _logger.log(Level.FINER, SaslImpl2.this + " about to call 
plain input");
+                }
+
+                if (_inputBuffer.hasRemaining())
+                {
+                    int bytes = pourAll(_inputBuffer, _underlyingInput);
+                    if (bytes == Transport.END_OF_STREAM)
+                    {
+                        _tail_closed = true;
+                    }
+
+                    _underlyingInput.process();
+                }
+                else
+                {
+                    _underlyingInput.process();
+                }
+            }
+        }
+
+        @Override
+        public int pending()
+        {
+            if (isOutputInSaslMode() || _outputBuffer.position() != 0)
+            {
+                fillOutputBuffer();
+                _head.limit(_outputBuffer.position());
+
+                if (_head_closed && _outputBuffer.position() == 0)
+                {
+                    return Transport.END_OF_STREAM;
+                }
+                else
+                {
+                    return _outputBuffer.position();
+                }
+            }
+            else
+            {
+                return _underlyingOutput.pending();
+            }
+        }
+
+        @Override
+        public ByteBuffer head()
+        {
+            if (isOutputInSaslMode() || _outputBuffer.position() != 0)
+            {
+                pending();
+                return _head;
+            }
+            else
+            {
+                return _underlyingOutput.head();
+            }
+        }
+
+        @Override
+        public void pop(int bytes)
+        {
+            if (isOutputInSaslMode() || _outputBuffer.position() != 0)
+            {
+                _outputBuffer.flip();
+                _outputBuffer.position(bytes);
+                _outputBuffer.compact();
+                _head.position(0);
+                _head.limit(_outputBuffer.position());
+            }
+            else
+            {
+                _underlyingOutput.pop(bytes);
+            }
+        }
+
+        @Override
+        public void close_head()
+        {
+            _underlyingOutput.close_head();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cf29c4ae/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl2.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl2.java 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl2.java
new file mode 100644
index 0000000..6cf5a96
--- /dev/null
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl2.java
@@ -0,0 +1,1670 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.engine.impl;
+
+import static 
org.apache.qpid.proton.engine.impl.ByteBufferUtils.pourArrayToBuffer;
+import static 
org.apache.qpid.proton.engine.impl.ByteBufferUtils.pourBufferToArray;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.UnsignedShort;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.transport2.Attach;
+import org.apache.qpid.proton.transport2.Begin;
+import org.apache.qpid.proton.transport2.Close;
+import org.apache.qpid.proton.transport2.ConnectionError;
+import org.apache.qpid.proton.transport2.Detach;
+import org.apache.qpid.proton.transport2.Disposition;
+import org.apache.qpid.proton.transport2.End;
+import org.apache.qpid.proton.transport2.ErrorCondition;
+import org.apache.qpid.proton.transport2.Flow;
+import org.apache.qpid.proton.transport2.Open;
+import org.apache.qpid.proton.transport2.Performative;
+import org.apache.qpid.proton.transport2.Role;
+import org.apache.qpid.proton.transport2.Transfer;
+import org.apache.qpid.proton.codec2.ByteArrayDecoder;
+import org.apache.qpid.proton.codec2.ByteArrayEncoder;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.ProtonJTransport;
+import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Ssl;
+import org.apache.qpid.proton.engine.SslDomain;
+import org.apache.qpid.proton.engine.SslPeerDetails;
+import org.apache.qpid.proton.engine.TransportException;
+import org.apache.qpid.proton.engine.TransportResult;
+import org.apache.qpid.proton.engine.TransportResultFactory;
+import org.apache.qpid.proton.engine.impl.ssl.SslImpl;
+import org.apache.qpid.proton.framing.TransportFrame;
+import org.apache.qpid.proton.framing.TransportFrame2;
+
+public class TransportImpl2 extends EndpointImpl
+    implements ProtonJTransport, FrameHandler2, TransportOutputWriter
+{
+    static final int BUFFER_RELEASE_THRESHOLD = 
Integer.getInteger("proton.transport_buffer_release_threshold", 2 * 1024 * 
1024);
+
+    private static final boolean getBooleanEnv(String name)
+    {
+        String value = System.getenv(name);
+        return "true".equalsIgnoreCase(value) ||
+            "1".equals(value) ||
+            "yes".equalsIgnoreCase(value);
+    }
+
+    private static final boolean FRM_ENABLED = getBooleanEnv("PN_TRACE_FRM");
+    private static final int TRACE_FRAME_PAYLOAD_LENGTH = 
Integer.getInteger("proton.trace_frame_payload_length", 80);
+
+    // trace levels
+    private int _levels = (FRM_ENABLED ? this.TRACE_FRM : 0);
+
+    private FrameParser _frameParser;
+
+    private ConnectionImpl _connectionEndpoint;
+
+    private boolean _isOpenSent;
+    private boolean _isCloseSent;
+
+    private boolean _headerWritten;
+    private Map<Integer, TransportSession> _remoteSessions = new 
HashMap<Integer, TransportSession>();
+    private Map<Integer, TransportSession> _localSessions = new 
HashMap<Integer, TransportSession>();
+
+    private TransportInput _inputProcessor;
+    private TransportOutput _outputProcessor;
+
+    private ByteArrayDecoder _decoder = new ByteArrayDecoder();
+    private ByteArrayEncoder _encoder = new ByteArrayEncoder();
+
+    private int _maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
+    private int _remoteMaxFrameSize = 512;
+    private int _channelMax = 65535;
+    private int _remoteChannelMax = 65535;
+
+    private final FrameWriter2 _frameWriter;
+
+    private boolean _closeReceived;
+    private Open _open;
+    private SaslImpl2 _sasl;
+    private SslImpl _ssl;
+    private final Ref<ProtocolTracer> _protocolTracer = new Ref(null);
+
+    private TransportResult _lastTransportResult = TransportResultFactory.ok();
+
+    private boolean _init;
+    private boolean _processingStarted;
+
+    private FrameHandler2 _frameHandler = this;
+    private boolean _head_closed = false;
+    private ErrorCondition _condition = null;
+
+    private boolean postedHeadClosed = false;
+    private boolean postedTailClosed = false;
+
+    private int _localIdleTimeout = 0;
+    private int _remoteIdleTimeout = 0;
+    private long _bytesInput = 0;
+    private long _bytesOutput = 0;
+    private long _localIdleDeadline = 0;
+    private long _lastBytesInput = 0;
+    private long _lastBytesOutput = 0;
+    private long _remoteIdleDeadline = 0;
+
+    /**
+     * @deprecated This constructor's visibility will be reduced to the 
default scope in a future release.
+     * Client code outside this module should use a {@link EngineFactory} 
instead
+     */
+    @Deprecated public TransportImpl2()
+    {
+        this(DEFAULT_MAX_FRAME_SIZE);
+    }
+
+
+    /**
+     * Creates a transport with the given maximum frame size.
+     * Note that the maximumFrameSize also determines the size of the output 
buffer.
+     */
+    TransportImpl2(int maxFrameSize)
+    {
+        _maxFrameSize = maxFrameSize;
+        _frameWriter = new FrameWriter2(_encoder, _remoteMaxFrameSize,
+                        FrameWriter.AMQP_FRAME_TYPE,
+                        _protocolTracer,
+                        this);
+    }
+
+    private void init()
+    {
+        if(!_init)
+        {
+            _init = true;
+            _frameParser = new FrameParser(_frameHandler , _decoder, 
_maxFrameSize);
+            _inputProcessor = _frameParser;
+            _outputProcessor = new TransportOutputAdaptor(this, _maxFrameSize);
+        }
+    }
+
+    @Override
+    public void trace(int levels) {
+        _levels = levels;
+    }
+
+    @Override
+    public int getMaxFrameSize()
+    {
+        return _maxFrameSize;
+    }
+
+    @Override
+    public int getRemoteMaxFrameSize()
+    {
+        return _remoteMaxFrameSize;
+    }
+
+    /**
+     * TODO propagate the new value to {@link #_outputProcessor} etc
+     */
+    @Override
+    public void setMaxFrameSize(int maxFrameSize)
+    {
+        if(_init)
+        {
+            throw new IllegalStateException("Cannot set max frame size after 
transport has been initialised");
+        }
+        _maxFrameSize = maxFrameSize;
+    }
+
+    @Override
+    public int getChannelMax()
+    {
+        return _channelMax;
+    }
+
+    @Override
+    public void setChannelMax(int n)
+    {
+        _channelMax = n;
+    }
+
+    @Override
+    public int getRemoteChannelMax()
+    {
+        return _remoteChannelMax;
+    }
+
+    @Override
+    public ErrorCondition getCondition()
+    {
+        return _condition;
+    }
+
+    @Override
+    public void bind(Connection conn)
+    {
+        // TODO - check if already bound
+
+        _connectionEndpoint = (ConnectionImpl) conn;
+        put(Event.Type.CONNECTION_BOUND, conn);
+        _connectionEndpoint.setTransport(this);
+        _connectionEndpoint.incref();
+
+        if(getRemoteState() != EndpointState.UNINITIALIZED)
+        {
+            //_connectionEndpoint.handleOpen(_open);
+            handleOpen(_connectionEndpoint, _open);
+            if(getRemoteState() == EndpointState.CLOSED)
+            {
+                _connectionEndpoint.setRemoteState(EndpointState.CLOSED);
+            }
+
+            _frameParser.flush();
+        }
+    }
+
+    @Override
+    public void unbind()
+    {
+        for (TransportSession ts: _localSessions.values()) {
+            ts.unbind();
+        }
+        for (TransportSession ts: _remoteSessions.values()) {
+            ts.unbind();
+        }
+
+        put(Event.Type.CONNECTION_UNBOUND, _connectionEndpoint);
+
+        _connectionEndpoint.modifyEndpoints();
+        _connectionEndpoint.setTransport(null);
+        _connectionEndpoint.decref();
+    }
+
+    @Override
+    public int input(byte[] bytes, int offset, int length)
+    {
+        oldApiCheckStateBeforeInput(length).checkIsOk();
+
+        ByteBuffer inputBuffer = getInputBuffer();
+        int numberOfBytesConsumed = pourArrayToBuffer(bytes, offset, length, 
inputBuffer);
+        processInput().checkIsOk();
+        return numberOfBytesConsumed;
+    }
+
+    /**
+     * This method is public as it is used by Python layer.
+     * @see Transport#input(byte[], int, int)
+     */
+    public TransportResult oldApiCheckStateBeforeInput(int inputLength)
+    {
+        _lastTransportResult.checkIsOk();
+        if(inputLength == 0)
+        {
+            if(_connectionEndpoint == null || 
_connectionEndpoint.getRemoteState() != EndpointState.CLOSED)
+            {
+                return TransportResultFactory.error(new 
TransportException("Unexpected EOS when remote connection not closed: 
connection aborted"));
+            }
+        }
+        return TransportResultFactory.ok();
+    }
+
+    
//==================================================================================================================
+    // Process model state to generate output
+
+    @Override
+    public int output(byte[] bytes, final int offset, final int size)
+    {
+        ByteBuffer outputBuffer = getOutputBuffer();
+        int numberOfBytesOutput = pourBufferToArray(outputBuffer, bytes, 
offset, size);
+        outputConsumed();
+        return numberOfBytesOutput;
+    }
+
+    @Override
+    public boolean writeInto(ByteBuffer outputBuffer)
+    {
+        processHeader();
+        processOpen();
+        processBegin();
+        processAttach();
+        processReceiverFlow();
+        // we process transport work twice intentionally, the first
+        // pass may end up settling deliveries that the second pass
+        // can clean up
+        processTransportWork();
+        processTransportWork();
+        processSenderFlow();
+        processDetach();
+        processEnd();
+        processClose();
+
+        _frameWriter.readBytes(outputBuffer);
+
+        return _isCloseSent || _head_closed;
+    }
+
+    @Override
+    public Sasl sasl()
+    {
+        if(_sasl == null)
+        {
+            if(_processingStarted)
+            {
+                throw new IllegalStateException("Sasl can't be initiated after 
transport has started processing");
+            }
+
+            init();
+            _sasl = new SaslImpl2(this, _remoteMaxFrameSize);
+            TransportWrapper transportWrapper = _sasl.wrap(_inputProcessor, 
_outputProcessor);
+            _inputProcessor = transportWrapper;
+            _outputProcessor = transportWrapper;
+        }
+        return _sasl;
+
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * <p>Note that sslDomain must implement {@link ProtonSslEngineProvider}. 
This is not possible
+     * enforce at the API level because {@link ProtonSslEngineProvider} is not 
part of the
+     * public Proton API.</p>
+     */
+    @Override
+    public Ssl ssl(SslDomain sslDomain, SslPeerDetails sslPeerDetails)
+    {
+        if (_ssl == null)
+        {
+            init();
+            _ssl = new SslImpl(sslDomain, sslPeerDetails);
+            TransportWrapper transportWrapper = _ssl.wrap(_inputProcessor, 
_outputProcessor);
+            _inputProcessor = transportWrapper;
+            _outputProcessor = transportWrapper;
+        }
+        return _ssl;
+    }
+
+    @Override
+    public Ssl ssl(SslDomain sslDomain)
+    {
+        return ssl(sslDomain, null);
+    }
+
+    private void processDetach()
+    {
+        if(_connectionEndpoint != null)
+        {
+            EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
+            while(endpoint != null)
+            {
+
+                if(endpoint instanceof LinkImpl)
+                {
+                    LinkImpl link = (LinkImpl) endpoint;
+                    TransportLink<?> transportLink = getTransportState(link);
+                    SessionImpl session = link.getSession();
+                    TransportSession transportSession = 
getTransportState(session);
+
+                    if(((link.getLocalState() == EndpointState.CLOSED) || 
link.detached())
+                       && transportLink.isLocalHandleSet()
+                       && transportSession.isLocalChannelSet()
+                       && !_isCloseSent)
+                    {
+                        if((link instanceof SenderImpl)
+                           && link.getQueued() > 0
+                           && !transportLink.detachReceived()
+                           && !transportSession.endReceived()
+                           && !_closeReceived) {
+                            endpoint = endpoint.transportNext();
+                            continue;
+                        }
+
+                        UnsignedInteger localHandle = 
transportLink.getLocalHandle();
+                        transportLink.clearLocalHandle();
+                        transportSession.freeLocalHandle(localHandle);
+
+
+                        Detach detach = new Detach();
+                        detach.setHandle(localHandle.intValue());
+                        detach.setClosed(!link.detached());
+
+                        ErrorCondition localError = convertEndpointError(link);
+                        if( localError.getCondition() !=null )
+                        {
+                            detach.setError(localError);
+                        }
+
+
+                        writeFrame(transportSession.getLocalChannel(), detach, 
null, null);
+                    }
+
+                    endpoint.clearModified();
+
+                }
+                endpoint = endpoint.transportNext();
+            }
+        }
+    }
+
+    private void writeFlow(TransportSession ssn, TransportLink link)
+    {
+        Flow flow = new Flow();
+        flow.setNextIncomingId(ssn.getNextIncomingId().intValue());
+        flow.setNextOutgoingId(ssn.getNextOutgoingId().intValue());
+        ssn.updateWindows();
+        flow.setIncomingWindow(ssn.getIncomingWindowSize().intValue());
+        flow.setOutgoingWindow(ssn.getOutgoingWindowSize().intValue());
+        if (link != null) {
+            flow.setHandle(link.getLocalHandle().intValue());
+            flow.setDeliveryCount(link.getDeliveryCount().intValue());
+            flow.setLinkCredit(link.getLinkCredit().intValue());
+            flow.setDrain(link.getLink().getDrain());
+        }
+        writeFrame(ssn.getLocalChannel(), flow, null, null);
+    }
+
+    private void processSenderFlow()
+    {
+        if(_connectionEndpoint != null)
+        {
+            EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
+            while(endpoint != null)
+            {
+
+                if(endpoint instanceof SenderImpl)
+                {
+                    SenderImpl sender = (SenderImpl) endpoint;
+                    if(sender.getDrain() && sender.getDrained() > 0)
+                    {
+                        TransportSender transportLink = 
sender.getTransportLink();
+                        TransportSession transportSession = 
sender.getSession().getTransportSession();
+                        UnsignedInteger credits = 
transportLink.getLinkCredit();
+                        
transportLink.setLinkCredit(UnsignedInteger.valueOf(0));
+                        
transportLink.setDeliveryCount(transportLink.getDeliveryCount().add(credits));
+                        transportLink.setLinkCredit(UnsignedInteger.ZERO);
+                        sender.setDrained(0);
+
+                        writeFlow(transportSession, transportLink);
+                    }
+
+                }
+
+                endpoint = endpoint.transportNext();
+            }
+        }
+    }
+
+    private void dumpQueue(String msg)
+    {
+        System.out.print("  " + msg + "{");
+        DeliveryImpl dlv = _connectionEndpoint.getTransportWorkHead();
+        while (dlv != null) {
+            System.out.print(new Binary(dlv.getTag()) + ", ");
+            dlv = dlv.getTransportWorkNext();
+        }
+        System.out.println("}");
+    }
+
+    private void processTransportWork()
+    {
+        if(_connectionEndpoint != null)
+        {
+            DeliveryImpl delivery = _connectionEndpoint.getTransportWorkHead();
+            while(delivery != null)
+            {
+                LinkImpl link = delivery.getLink();
+                if (link instanceof SenderImpl) {
+                    if (processTransportWorkSender(delivery, (SenderImpl) 
link)) {
+                        delivery = delivery.clearTransportWork();
+                    } else {
+                        delivery = delivery.getTransportWorkNext();
+                    }
+                } else {
+                    if (processTransportWorkReceiver(delivery, (ReceiverImpl) 
link)) {
+                        delivery = delivery.clearTransportWork();
+                    } else {
+                        delivery = delivery.getTransportWorkNext();
+                    }
+                }
+            }
+        }
+    }
+
+    private boolean processTransportWorkSender(DeliveryImpl delivery,
+                                               SenderImpl snd)
+    {
+        TransportLink<SenderImpl> tpLink = snd.getTransportLink();
+        SessionImpl session = snd.getSession();
+        TransportSession tpSession = session.getTransportSession();
+
+        boolean wasDone = delivery.isDone();
+
+        if(!delivery.isDone() &&
+           (delivery.getDataLength() > 0 || delivery != snd.current()) &&
+           tpSession.hasOutgoingCredit() && tpLink.hasCredit() &&
+           tpSession.isLocalChannelSet() &&
+           tpLink.getLocalHandle() != null && !_frameWriter.isFull())
+        {
+            UnsignedInteger deliveryId = tpSession.getOutgoingDeliveryId();
+            TransportDelivery tpDelivery = new TransportDelivery(deliveryId, 
delivery, tpLink);
+            delivery.setTransportDelivery(tpDelivery);
+
+            final Transfer transfer = new Transfer();
+            transfer.setDeliveryId(deliveryId.intValue());
+            transfer.setDeliveryTag(delivery.getTag());
+            transfer.setHandle(tpLink.getLocalHandle().intValue());
+
+            if(delivery.getLocalState() != null)
+            {
+                transfer.setState(delivery.getLocalState());
+            }
+
+            if(delivery.isSettled())
+            {
+                transfer.setSettled(Boolean.TRUE);
+            }
+            else
+            {
+                tpSession.addUnsettledOutgoing(deliveryId, delivery);
+            }
+
+            if(snd.current() == delivery)
+            {
+                transfer.setMore(true);
+            }
+
+            transfer.setMessageFormat(0);
+
+            // TODO - large frames
+            ByteBuffer payload = delivery.getData() ==  null ? null :
+                ByteBuffer.wrap(delivery.getData(), delivery.getDataOffset(),
+                                delivery.getDataLength());
+
+            writeFrame(tpSession.getLocalChannel(), transfer, payload,
+                       new PartialTransfer(transfer));
+            tpSession.incrementOutgoingId();
+            tpSession.decrementRemoteIncomingWindow();
+
+            if(payload == null || !payload.hasRemaining())
+            {
+                session.incrementOutgoingBytes(-delivery.pending());
+                delivery.setData(null);
+                delivery.setDataLength(0);
+
+                if (!transfer.getMore()) {
+                    delivery.setDone();
+                    
tpLink.setDeliveryCount(tpLink.getDeliveryCount().add(UnsignedInteger.ONE));
+                    
tpLink.setLinkCredit(tpLink.getLinkCredit().subtract(UnsignedInteger.ONE));
+                    tpSession.incrementOutgoingDeliveryId();
+                    session.incrementOutgoingDeliveries(-1);
+                    snd.decrementQueued();
+                }
+            }
+            else
+            {
+                int delta = delivery.getDataLength() - payload.remaining();
+                delivery.setDataOffset(delivery.getDataOffset() + delta);
+                delivery.setDataLength(payload.remaining());
+                session.incrementOutgoingBytes(-delta);
+            }
+
+            getConnectionImpl().put(Event.Type.LINK_FLOW, snd);
+        }
+
+        if(wasDone && delivery.getLocalState() != null)
+        {
+            TransportDelivery tpDelivery = delivery.getTransportDelivery();
+            Disposition disposition = new Disposition();
+            disposition.setFirst(tpDelivery.getDeliveryId().intValue());
+            disposition.setLast(tpDelivery.getDeliveryId().intValue());
+            disposition.setRole(Role.SENDER);
+            disposition.setSettled(delivery.isSettled());
+            if(delivery.isSettled())
+            {
+                tpDelivery.settled();
+            }
+            disposition.setState(delivery.getLocalState());
+
+            writeFrame(tpSession.getLocalChannel(), disposition, null,
+                       null);
+        }
+
+        return !delivery.isBuffered();
+    }
+
+    private boolean processTransportWorkReceiver(DeliveryImpl delivery,
+                                                 ReceiverImpl rcv)
+    {
+        TransportDelivery tpDelivery = delivery.getTransportDelivery();
+        SessionImpl session = rcv.getSession();
+        TransportSession tpSession = session.getTransportSession();
+
+        if (tpSession.isLocalChannelSet())
+        {
+            Disposition disposition = new Disposition();
+            disposition.setFirst(tpDelivery.getDeliveryId().intValue());
+            disposition.setLast(tpDelivery.getDeliveryId().intValue());
+            disposition.setRole(Role.RECEIVER);
+            disposition.setSettled(delivery.isSettled());
+
+            disposition.setState(delivery.getLocalState());
+            writeFrame(tpSession.getLocalChannel(), disposition, null, null);
+            if (delivery.isSettled())
+            {
+                tpDelivery.settled();
+            }
+            return true;
+        }
+
+        return false;
+    }
+
+    private void processReceiverFlow()
+    {
+        if(_connectionEndpoint != null)
+        {
+            EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
+            while(endpoint != null)
+            {
+                if(endpoint instanceof ReceiverImpl)
+                {
+                    ReceiverImpl receiver = (ReceiverImpl) endpoint;
+                    TransportLink<?> transportLink = 
getTransportState(receiver);
+                    TransportSession transportSession = 
getTransportState(receiver.getSession());
+
+                    if(receiver.getLocalState() == EndpointState.ACTIVE)
+                    {
+                        int credits = receiver.clearUnsentCredits();
+                        if(credits != 0 || receiver.getDrain() ||
+                           
transportSession.getIncomingWindowSize().equals(UnsignedInteger.ZERO))
+                        {
+                            transportLink.addCredit(credits);
+                            writeFlow(transportSession, transportLink);
+                        }
+                    }
+                }
+                endpoint = endpoint.transportNext();
+            }
+            endpoint = _connectionEndpoint.getTransportHead();
+            while(endpoint != null)
+            {
+                if(endpoint instanceof SessionImpl)
+                {
+
+                    SessionImpl session = (SessionImpl) endpoint;
+                    TransportSession transportSession = 
getTransportState(session);
+
+                    if(session.getLocalState() == EndpointState.ACTIVE)
+                    {
+                        
if(transportSession.getIncomingWindowSize().equals(UnsignedInteger.ZERO))
+                        {
+                            writeFlow(transportSession, null);
+                        }
+                    }
+                }
+                endpoint = endpoint.transportNext();
+            }
+        }
+    }
+
+    private void processAttach()
+    {
+        if(_connectionEndpoint != null)
+        {
+            EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
+
+            while(endpoint != null)
+            {
+                if(endpoint instanceof LinkImpl)
+                {
+
+                    LinkImpl link = (LinkImpl) endpoint;
+                    TransportLink<?> transportLink = getTransportState(link);
+                    if(link.getLocalState() != EndpointState.UNINITIALIZED && 
!transportLink.attachSent())
+                    {
+
+                        if( (link.getRemoteState() == EndpointState.ACTIVE
+                            && !transportLink.isLocalHandleSet()) || 
link.getRemoteState() == EndpointState.UNINITIALIZED)
+                        {
+
+                            SessionImpl session = link.getSession();
+                            TransportSession transportSession = 
getTransportState(session);
+                            UnsignedInteger localHandle = 
transportSession.allocateLocalHandle(transportLink);
+
+                            if(link.getRemoteState() == 
EndpointState.UNINITIALIZED)
+                            {
+                                
transportSession.addHalfOpenLink(transportLink);
+                            }
+
+                            Attach attach = new Attach();
+                            attach.setHandle(localHandle.intValue());
+                            attach.setName(transportLink.getName());
+
+                            if(link.getSenderSettleMode() != null)
+                            {
+                                
attach.setSndSettleMode(link.getSenderSettleMode());
+                            }
+
+                            if(link.getReceiverSettleMode() != null)
+                            {
+                                
attach.setRcvSettleMode(link.getReceiverSettleMode());
+                            }
+
+                            if(link.getSource() != null)
+                            {
+                                attach.setSource(link.getSource());
+                            }
+
+                            if(link.getTarget() != null)
+                            {
+                                attach.setTarget(link.getTarget());
+                            }
+
+                            attach.setRole(endpoint instanceof ReceiverImpl ? 
Role.RECEIVER : Role.SENDER);
+
+                            if(link instanceof SenderImpl)
+                            {
+                                attach.setInitialDeliveryCount(0);
+                            }
+
+                            writeFrame(transportSession.getLocalChannel(), 
attach, null, null);
+                            transportLink.sentAttach();
+                        }
+                    }
+                }
+                endpoint = endpoint.transportNext();
+            }
+        }
+    }
+
+    private void processHeader()
+    {
+        if(!_headerWritten)
+        {
+            _frameWriter.writeHeader(AmqpHeader.HEADER);
+            _headerWritten = true;
+        }
+    }
+
+    private void processOpen()
+    {
+        if ((_condition != null ||
+             (_connectionEndpoint != null &&
+              _connectionEndpoint.getLocalState() != 
EndpointState.UNINITIALIZED)) &&
+            !_isOpenSent) {
+            Open open = new Open();
+            if (_connectionEndpoint != null) {
+                String cid = _connectionEndpoint.getLocalContainerId();
+                open.setContainerId(cid == null ? "" : cid);
+                open.setHostname(_connectionEndpoint.getHostname());
+                
open.setDesiredCapabilities(convertToStringArray(_connectionEndpoint.getDesiredCapabilities()));
+                
open.setOfferedCapabilities(convertToStringArray(_connectionEndpoint.getOfferedCapabilities()));
+                
open.setProperties(convertSymbolToStringKeyMap(_connectionEndpoint.getProperties()));
+            } else {
+                open.setContainerId("");
+            }
+
+            if (_maxFrameSize > 0) {
+                open.setMaxFrameSize(_maxFrameSize);
+            }
+            if (_channelMax > 0) {
+                open.setChannelMax(_channelMax);
+            }
+
+            // as per the recommendation in the spec, advertise half our
+            // actual timeout to the remote
+            if (_localIdleTimeout > 0) {
+                open.setIdleTimeOut(_localIdleTimeout / 2);
+            }
+            _isOpenSent = true;
+
+            writeFrame(0, open, null, null);
+        }
+    }
+
+    private void processBegin()
+    {
+        if(_connectionEndpoint != null)
+        {
+            EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
+            while(endpoint != null)
+            {
+                if(endpoint instanceof SessionImpl)
+                {
+                    SessionImpl session = (SessionImpl) endpoint;
+                    TransportSession transportSession = 
getTransportState(session);
+                    if(session.getLocalState() != EndpointState.UNINITIALIZED 
&& !transportSession.beginSent())
+                    {
+                        int channelId = allocateLocalChannel(transportSession);
+                        Begin begin = new Begin();
+
+                        if(session.getRemoteState() != 
EndpointState.UNINITIALIZED)
+                        {
+                            
begin.setRemoteChannel(transportSession.getRemoteChannel());
+                        }
+                        
begin.setHandleMax(transportSession.getHandleMax().intValue());
+                        
begin.setIncomingWindow(transportSession.getIncomingWindowSize().intValue());
+                        
begin.setOutgoingWindow(transportSession.getOutgoingWindowSize().intValue());
+                        
begin.setNextOutgoingId(transportSession.getNextOutgoingId().intValue());
+
+                        writeFrame(channelId, begin, null, null);
+                        transportSession.sentBegin();
+                    }
+                }
+                endpoint = endpoint.transportNext();
+            }
+        }
+    }
+
+    private TransportSession getTransportState(SessionImpl session)
+    {
+        TransportSession transportSession = session.getTransportSession();
+        if(transportSession == null)
+        {
+            transportSession = new TransportSession(this, session);
+            session.setTransportSession(transportSession);
+        }
+        return transportSession;
+    }
+
+    private TransportLink<?> getTransportState(LinkImpl link)
+    {
+        TransportLink<?> transportLink = link.getTransportLink();
+        if(transportLink == null)
+        {
+            transportLink = TransportLink.createTransportLink(link);
+        }
+        return transportLink;
+    }
+
+    private int allocateLocalChannel(TransportSession transportSession)
+    {
+        for (int i = 0; i < _connectionEndpoint.getMaxChannels(); i++)
+        {
+            if (!_localSessions.containsKey(i))
+            {
+                _localSessions.put(i, transportSession);
+                transportSession.setLocalChannel(i);
+                return i;
+            }
+        }
+
+        return -1;
+    }
+
+    private int freeLocalChannel(TransportSession transportSession)
+    {
+        final int channel = transportSession.getLocalChannel();
+        _localSessions.remove(channel);
+        transportSession.freeLocalChannel();
+        return channel;
+    }
+
+    private void processEnd()
+    {
+        if(_connectionEndpoint != null)
+        {
+            EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
+            while(endpoint != null)
+            {
+                SessionImpl session;
+                TransportSession transportSession;
+
+                if((endpoint instanceof SessionImpl)) {
+                    if ((session = (SessionImpl)endpoint).getLocalState() == 
EndpointState.CLOSED
+                        && (transportSession = 
session.getTransportSession()).isLocalChannelSet()
+                        && !_isCloseSent)
+                    {
+                        if (hasSendableMessages(session)) {
+                            endpoint = endpoint.transportNext();
+                            continue;
+                        }
+
+                        int channel = freeLocalChannel(transportSession);
+                        End end = new End();
+                        ErrorCondition localError = 
convertEndpointError(endpoint);
+                        if( localError.getCondition() !=null )
+                        {
+                            end.setError(localError);
+                        }
+
+                        writeFrame(channel, end, null, null);
+                    }
+
+                    endpoint.clearModified();
+                }
+
+                endpoint = endpoint.transportNext();
+            }
+        }
+    }
+
+    private boolean hasSendableMessages(SessionImpl session)
+    {
+        if (_connectionEndpoint == null) {
+            return false;
+        }
+
+        if(!_closeReceived && (session == null || 
!session.getTransportSession().endReceived()))
+        {
+            EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
+            while(endpoint != null)
+            {
+                if(endpoint instanceof SenderImpl)
+                {
+                    SenderImpl sender = (SenderImpl) endpoint;
+                    if((session == null || sender.getSession() == session)
+                       && sender.getQueued() != 0
+                        && !getTransportState(sender).detachReceived())
+                    {
+                        return true;
+                    }
+                }
+                endpoint = endpoint.transportNext();
+            }
+        }
+        return false;
+    }
+
+    private void processClose()
+    {
+        if ((_condition != null ||
+             (_connectionEndpoint != null &&
+              _connectionEndpoint.getLocalState() == EndpointState.CLOSED)) &&
+            !_isCloseSent) {
+            if(!hasSendableMessages(null))
+            {
+                Close close = new Close();
+
+                ErrorCondition localError;
+
+                if (_connectionEndpoint == null) {
+                    localError = _condition;
+                } else {
+                    localError =  convertEndpointError(_connectionEndpoint);
+                }
+
+                if(localError.getCondition() != null)
+                {
+                    close.setError(localError);
+                }
+
+                _isCloseSent = true;
+
+                writeFrame(0, close, null, null);
+
+                if (_connectionEndpoint != null) {
+                    _connectionEndpoint.clearModified();
+                }
+            }
+        }
+    }
+
+    protected void writeFrame(int channel, Performative frameBody,
+                            ByteBuffer payload, Runnable onPayloadTooLarge)
+    {
+        _frameWriter.writeFrame(channel, frameBody, payload, 
onPayloadTooLarge);
+    }
+
+    
//==================================================================================================================
+
+    @Override
+    protected ConnectionImpl getConnectionImpl()
+    {
+        return _connectionEndpoint;
+    }
+
+    @Override
+    void postFinal() {}
+
+    @Override
+    void doFree() { }
+
+    
//==================================================================================================================
+    // handle incoming amqp data
+
+
+    public void handleOpen(Open open, Binary payload, Integer channel)
+    {
+        setRemoteState(EndpointState.ACTIVE);
+        if(_connectionEndpoint != null)
+        {
+            handleOpen(_connectionEndpoint,open);
+        }
+        else
+        {
+            _open = open;
+        }
+
+        if(open.getMaxFrameSize() > 0)
+        {
+            _remoteMaxFrameSize = (int) open.getMaxFrameSize();
+            _frameWriter.setMaxFrameSize(_remoteMaxFrameSize);
+        }
+
+        if (open.getChannelMax() > 0)
+        {
+            _remoteChannelMax = (int) open.getChannelMax();
+        }
+
+        if (open.getIdleTimeOut() > 0)
+        {
+            //TODO why is _remoteIdleTimeout int ?
+            _remoteIdleTimeout = (int)open.getIdleTimeOut();
+        }
+    }
+
+    public void handleBegin(Begin begin, Binary payload, Integer channel)
+    {
+        // TODO - check channel < max_channel
+        TransportSession transportSession = _remoteSessions.get(channel);
+        if(transportSession != null)
+        {
+            // TODO - fail due to begin on begun session
+        }
+        else
+        {
+            SessionImpl session;
+            if(begin.getRemoteChannel() == null)
+            {
+                session = _connectionEndpoint.session();
+                transportSession = getTransportState(session);
+            }
+            else
+            {
+                // TODO check null
+                transportSession = 
_localSessions.get(begin.getRemoteChannel().intValue());
+                if (transportSession == null) {
+                    throw new NullPointerException("uncorrelated channel: " + 
begin.getRemoteChannel());
+                }
+                session = transportSession.getSession();
+
+            }
+            transportSession.setRemoteChannel(channel);
+            session.setRemoteState(EndpointState.ACTIVE);
+            transportSession.setNextIncomingId(begin.getNextOutgoingId());
+            _remoteSessions.put(channel, transportSession);
+
+            _connectionEndpoint.put(Event.Type.SESSION_REMOTE_OPEN, session);
+        }
+
+    }
+
+    @Override
+    public void handleAttach(Attach attach, Binary payload, Integer channel)
+    {
+        TransportSession transportSession = _remoteSessions.get(channel);
+        if(transportSession == null)
+        {
+            // TODO - fail due to attach on non-begun session
+        }
+        else
+        {
+            SessionImpl session = transportSession.getSession();
+            final UnsignedInteger handle = attach.getHandle();
+            if (handle.compareTo(transportSession.getHandleMax()) > 0) {
+                // The handle-max value is the highest handle value that can 
be used on the session. A peer MUST
+                // NOT attempt to attach a link using a handle value outside 
the range that its partner can handle.
+                // A peer that receives a handle outside the supported range 
MUST close the connection with the
+                // framing-error error-code.
+                ErrorCondition condition =
+                        new ErrorCondition(ConnectionError.FRAMING_ERROR,
+                                                            "handle-max 
exceeded");
+                _connectionEndpoint.setCondition(condition);
+                _connectionEndpoint.setLocalState(EndpointState.CLOSED);
+                if (!_isCloseSent) {
+                    Close close = new Close();
+                    close.setError(condition);
+                    _isCloseSent = true;
+                    writeFrame(0, close, null, null);
+                }
+                close_tail();
+                return;
+            }
+            TransportLink<?> transportLink = 
transportSession.getLinkFromRemoteHandle(handle);
+            LinkImpl link = null;
+
+            if(transportLink != null)
+            {
+                // TODO - fail - attempt attach on a handle which is in use
+            }
+            else
+            {
+                transportLink = 
transportSession.resolveHalfOpenLink(attach.getName());
+                if(transportLink == null)
+                {
+
+                    link = (attach.getRole() == Role.RECEIVER)
+                           ? session.sender(attach.getName())
+                           : session.receiver(attach.getName());
+                    transportLink = getTransportState(link);
+                }
+                else
+                {
+                    link = transportLink.getLink();
+                }
+                if(attach.getRole() == Role.SENDER)
+                {
+                    
transportLink.setDeliveryCount(attach.getInitialDeliveryCount());
+                }
+
+                link.setRemoteState(EndpointState.ACTIVE);
+                link.setRemoteSource(attach.getSource());
+                link.setRemoteTarget(attach.getTarget());
+
+                link.setRemoteReceiverSettleMode(attach.getRcvSettleMode());
+                link.setRemoteSenderSettleMode(attach.getSndSettleMode());
+
+                transportLink.setName(attach.getName());
+                transportLink.setRemoteHandle(handle);
+                transportSession.addLinkRemoteHandle(transportLink, handle);
+
+            }
+
+            _connectionEndpoint.put(Event.Type.LINK_REMOTE_OPEN, link);
+        }
+    }
+
+    @Override
+    public void handleFlow(Flow flow, Binary payload, Integer channel)
+    {
+        TransportSession transportSession = _remoteSessions.get(channel);
+        if(transportSession == null)
+        {
+            // TODO - fail due to attach on non-begun session
+        }
+        else
+        {
+            transportSession.handleFlow(flow);
+        }
+
+    }
+
+    @Override
+    public void handleTransfer(Transfer transfer, Binary payload, Integer 
channel)
+    {
+        // TODO - check channel < max_channel
+        TransportSession transportSession = _remoteSessions.get(channel);
+        if(transportSession != null)
+        {
+            transportSession.handleTransfer(transfer, payload);
+        }
+        else
+        {
+            // TODO - fail due to begin on begun session
+        }
+    }
+
+    @Override
+    public void handleDisposition(Disposition disposition, Binary payload, 
Integer channel)
+    {
+        TransportSession transportSession = _remoteSessions.get(channel);
+        if(transportSession == null)
+        {
+            // TODO - fail due to attach on non-begun session
+        }
+        else
+        {
+            transportSession.handleDisposition(disposition);
+        }
+    }
+
+    @Override
+    public void handleDetach(Detach detach, Binary payload, Integer channel)
+    {
+        TransportSession transportSession = _remoteSessions.get(channel);
+        if(transportSession == null)
+        {
+            // TODO - fail due to attach on non-begun session
+        }
+        else
+        {
+            TransportLink<?> transportLink = 
transportSession.getLinkFromRemoteHandle(detach.getHandle());
+
+            if(transportLink != null)
+            {
+                LinkImpl link = transportLink.getLink();
+                transportLink.receivedDetach();
+                
transportSession.freeRemoteHandle(transportLink.getRemoteHandle());
+                if (detach.getClosed()) {
+                    _connectionEndpoint.put(Event.Type.LINK_REMOTE_CLOSE, 
link);
+                } else {
+                    _connectionEndpoint.put(Event.Type.LINK_REMOTE_DETACH, 
link);
+                }
+                transportLink.clearRemoteHandle();
+                link.setRemoteState(EndpointState.CLOSED);
+                if(detach.getError() != null)
+                {
+                    link.getRemoteCondition().copyFrom(detach.getError());
+                }
+            }
+            else
+            {
+                // TODO - fail - attempt attach on a handle which is in use
+            }
+        }
+    }
+
+    @Override
+    public void handleEnd(End end, Binary payload, Integer channel)
+    {
+        TransportSession transportSession = _remoteSessions.get(channel);
+        if(transportSession == null)
+        {
+            // TODO - fail due to attach on non-begun session
+        }
+        else
+        {
+            _remoteSessions.remove(channel);
+            transportSession.receivedEnd();
+            transportSession.unsetRemoteChannel();
+            SessionImpl session = transportSession.getSession();
+            session.setRemoteState(EndpointState.CLOSED);
+            ErrorCondition errorCondition = end.getError();
+            if(errorCondition != null)
+            {
+                session.getRemoteCondition().copyFrom(errorCondition);
+            }
+
+            _connectionEndpoint.put(Event.Type.SESSION_REMOTE_CLOSE, session);
+        }
+    }
+
+    @Override
+    public void handleClose(Close close, Binary payload, Integer channel)
+    {
+        _closeReceived = true;
+        _remoteIdleTimeout = 0;
+        setRemoteState(EndpointState.CLOSED);
+        if(_connectionEndpoint != null)
+        {
+            _connectionEndpoint.setRemoteState(EndpointState.CLOSED);
+            if(close.getError() != null)
+            {
+                
_connectionEndpoint.getRemoteCondition().copyFrom(close.getError());
+            }
+
+            _connectionEndpoint.put(Event.Type.CONNECTION_REMOTE_CLOSE, 
_connectionEndpoint);
+        }
+
+    }
+
+    @Override
+    public boolean handleFrame(TransportFrame2 frame)
+    {
+        if (!isHandlingFrames())
+        {
+            throw new IllegalStateException("Transport cannot accept frame: " 
+ frame);
+        }
+
+        log(INCOMING, frame);
+
+        ProtocolTracer tracer = _protocolTracer.get();
+        if( tracer != null )
+        {
+            tracer.receivedFrame(frame);
+        }
+
+        frame.getBody().invoke(this,frame.getPayload(), frame.getChannel());
+        return _closeReceived;
+    }
+
+    void put(Event.Type type, Object context) {
+        if (_connectionEndpoint != null) {
+            _connectionEndpoint.put(type, context);
+        }
+    }
+
+    private void maybePostClosed()
+    {
+        if (postedHeadClosed && postedTailClosed) {
+            put(Event.Type.TRANSPORT_CLOSED, this);
+        }
+    }
+
+    @Override
+    public void closed(TransportException error)
+    {
+        if (!_closeReceived || error != null) {
+            if (error == null) {
+                _condition = new ErrorCondition(ConnectionError.FRAMING_ERROR,
+                                               "connection aborted");
+            } else {
+                _condition = new ErrorCondition(ConnectionError.FRAMING_ERROR,
+                                                error.toString());
+            }
+            _head_closed = true;
+        }
+        if (_condition != null) {
+            put(Event.Type.TRANSPORT_ERROR, this);
+        }
+        if (!postedTailClosed) {
+            put(Event.Type.TRANSPORT_TAIL_CLOSED, this);
+            postedTailClosed = true;
+            maybePostClosed();
+        }
+    }
+
+    @Override
+    public boolean isHandlingFrames()
+    {
+        return _connectionEndpoint != null || getRemoteState() == 
EndpointState.UNINITIALIZED;
+    }
+
+    @Override
+    public ProtocolTracer getProtocolTracer()
+    {
+        return _protocolTracer.get();
+    }
+
+    @Override
+    public void setProtocolTracer(ProtocolTracer protocolTracer)
+    {
+        this._protocolTracer.set(protocolTracer);
+    }
+
+    @Override
+    public ByteBuffer getInputBuffer()
+    {
+        return tail();
+    }
+
+    @Override
+    public TransportResult processInput()
+    {
+        try {
+            process();
+            return TransportResultFactory.ok();
+        } catch (TransportException e) {
+            return TransportResultFactory.error(e);
+        }
+    }
+
+    @Override
+    public ByteBuffer getOutputBuffer()
+    {
+        pending();
+        return head();
+    }
+
+    @Override
+    public void outputConsumed()
+    {
+        pop(_outputProcessor.head().position());
+    }
+
+    @Override
+    public int capacity()
+    {
+        init();
+        return _inputProcessor.capacity();
+    }
+
+    @Override
+    public ByteBuffer tail()
+    {
+        init();
+        return _inputProcessor.tail();
+    }
+
+    @Override
+    public void process() throws TransportException
+    {
+        _processingStarted = true;
+
+        try {
+            init();
+            int beforePosition = _inputProcessor.position();
+            _inputProcessor.process();
+            _bytesInput += beforePosition - _inputProcessor.position();
+        } catch (TransportException e) {
+            _head_closed = true;
+            throw e;
+        }
+    }
+
+    @Override
+    public void close_tail()
+    {
+        init();
+        _inputProcessor.close_tail();
+    }
+
+    @Override
+    public int pending()
+    {
+        init();
+        return _outputProcessor.pending();
+    }
+
+    @Override
+    public ByteBuffer head()
+    {
+        init();
+        return _outputProcessor.head();
+    }
+
+    @Override
+    public void pop(int bytes)
+    {
+        init();
+        _outputProcessor.pop(bytes);
+        _bytesOutput += bytes;
+
+        int p = pending();
+        if (p < 0 && !postedHeadClosed) {
+            put(Event.Type.TRANSPORT_HEAD_CLOSED, this);
+            postedHeadClosed = true;
+            maybePostClosed();
+        }
+    }
+
+    public void setIdleTimeout(int timeout) {
+        _localIdleTimeout = timeout;
+    }
+
+    public int getIdleTimeout() {
+        return _localIdleTimeout;
+    }
+
+    public int getRemoteIdleTimeout() {
+        return _remoteIdleTimeout;
+    }
+
+    @Override
+    public long tick(long now)
+    {
+        long timeout = 0;
+
+        if (_localIdleTimeout > 0) {
+            if (_localIdleDeadline == 0 || _lastBytesInput != _bytesInput) {
+                _localIdleDeadline = now + _localIdleTimeout;
+                _lastBytesInput = _bytesInput;
+            } else if (_localIdleDeadline <= now) {
+                _localIdleDeadline = now + _localIdleTimeout;
+
+                if (_connectionEndpoint != null &&
+                    _connectionEndpoint.getLocalState() != 
EndpointState.CLOSED) {
+                    ErrorCondition condition =
+                            new 
ErrorCondition(Symbol.getSymbol("amqp:resource-limit-exceeded"),
+                                                                
"local-idle-timeout expired");
+                    _connectionEndpoint.setCondition(condition);
+                    _connectionEndpoint.setLocalState(EndpointState.CLOSED);
+
+                    if (!_isOpenSent) {
+                        if ((_sasl != null) && (!_sasl.isDone())) {
+                            _sasl.fail();
+                        }
+                        Open open = new Open();
+                        _isOpenSent = true;
+                        writeFrame(0, open, null, null);
+                    }
+                    if (!_isCloseSent) {
+                        Close close = new Close();
+                        close.setError(condition);
+                        _isCloseSent = true;
+                        writeFrame(0, close, null, null);
+                    }
+                    close_tail();
+                }
+            }
+            timeout = _localIdleDeadline;
+        }
+
+        if (_remoteIdleTimeout != 0 && !_isCloseSent) {
+            if (_remoteIdleDeadline == 0 || _lastBytesOutput != _bytesOutput) {
+                _remoteIdleDeadline = now + (_remoteIdleTimeout / 2);
+                _lastBytesOutput = _bytesOutput;
+            } else if (_remoteIdleDeadline <= now) {
+                _remoteIdleDeadline = now + (_remoteIdleTimeout / 2);
+                if (pending() == 0) {
+                    writeFrame(0, null, null, null);
+                    _lastBytesOutput += pending();
+                }
+            }
+            timeout = Math.min(timeout == 0 ? _remoteIdleDeadline : timeout, 
_remoteIdleDeadline);
+        }
+
+        return timeout;
+    }
+
+    @Override
+    public long getFramesOutput()
+    {
+        return _frameWriter.getFramesOutput();
+    }
+
+    @Override
+    public long getFramesInput()
+    {
+        return _frameParser.getFramesInput();
+    }
+
+    @Override
+    public void close_head()
+    {
+        _outputProcessor.close_head();
+    }
+
+    public boolean isClosed() {
+        int p = pending();
+        int c = capacity();
+        return  p == END_OF_STREAM && c == END_OF_STREAM;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "TransportImpl [_connectionEndpoint=" + _connectionEndpoint + 
", " + super.toString() + "]";
+    }
+
+    private static class PartialTransfer implements Runnable
+    {
+        private final Transfer _transfer;
+
+        public PartialTransfer(Transfer transfer)
+        {
+            _transfer = transfer;
+        }
+
+        @Override
+        public void run()
+        {
+            _transfer.setMore(true);
+        }
+    }
+
+    /**
+     * Override the default frame handler. Must be called before the transport 
starts being used
+     * (e.g. {@link #getInputBuffer()}, {@link #getOutputBuffer()}, {@link 
#ssl(SslDomain)} etc).
+     */
+    public void setFrameHandler(FrameHandler frameHandler)
+    {
+        _frameHandler = frameHandler;
+    }
+
+    static String INCOMING = "<-";
+    static String OUTGOING = "->";
+
+    void log(String event, TransportFrame frame)
+    {
+        if ((_levels & TRACE_FRM) != 0) {
+            StringBuilder msg = new StringBuilder();
+            msg.append("[").append(System.identityHashCode(this)).append(":")
+                .append(frame.getChannel()).append("]");
+            msg.append(" ").append(event).append(" ").append(frame.getBody());
+            if (frame.getPayload() != null) {
+                String payload = frame.getPayload().toString();
+                if (payload.length() > TRACE_FRAME_PAYLOAD_LENGTH) {
+                    payload = payload.substring(0, TRACE_FRAME_PAYLOAD_LENGTH) 
+ "(" + payload.length() + ")";
+                }
+                msg.append(" \"").append(payload).append("\"");
+            }
+            System.out.println(msg.toString());
+        }
+    }
+
+    @Override
+    void localOpen() {}
+
+    @Override
+    void localClose() {}
+    
+    //TODO temp hack as the layer above still uses AMQP types. 
--------------------------------------------
+    private void handleOpen(ConnectionImpl connection, Open open)
+    {
+        connection.setRemoteState(EndpointState.ACTIVE);
+        connection.setRemoteHostname(open.getHostname());
+        connection.setRemoteContainer(open.getContainerId());
+        
connection.setRemoteDesiredCapabilities(convertToSymbolArray(open.getDesiredCapabilities()));
+        
connection.setRemoteOfferedCapabilities(convertToSymbolArray(open.getOfferedCapabilities()));
+        
connection.setRemoteProperties(convertStringToSymbolKeyMap(open.getProperties()));
+        connection.addConnectionOpenEvent();
+    }
+    
+    private Map<String, Object> convertSymbolToStringKeyMap(Map<Symbol, 
Object> in)
+    {
+        if (in == null)
+        {
+            return null;
+        }
+        Map<String, Object> out = new HashMap<String, Object>(in.size());
+        for (Symbol sym : in.keySet())
+        {
+            out.put(sym.toString(), in.get(sym));
+        }
+        return out;
+    }
+
+    private Map<Symbol, Object> convertStringToSymbolKeyMap(Map<String, 
Object> in)
+    {
+        if (in == null)
+        {
+            return null;
+        }
+        Map<Symbol, Object> out = new HashMap<Symbol, Object>(in.size());
+        for (String str : in.keySet())
+        {
+            out.put(Symbol.valueOf(str), in.get(str));
+        }
+        return out;
+    }
+    
+    private Symbol[] convertToSymbolArray(String[] in)
+    {
+        if (in == null)
+        {
+            return null;
+        }
+        Symbol[] out = new Symbol[in.length];
+        for(int i=0; i < in.length; i++)
+        {
+            out[i] = Symbol.valueOf(in[i]);
+        }
+        return out;
+    }
+
+    private String[] convertToStringArray(Symbol[] in)
+    {
+        if (in == null)
+        {
+            return null;
+        }
+        String[] out = new String[in.length];
+        for(int i=0; i < in.length; i++)
+        {
+            out[i] = in[i].toString();
+        }
+        return out;
+    }
+    
+    private ErrorCondition convertEndpointError(Endpoint endpoint)
+    {
+        if (endpoint != null && endpoint.getCondition() != null)
+        {
+            return new 
ErrorCondition(endpoint.getCondition().getCondition().toString(), 
endpoint.getCondition().getDescription());
+        }
+        else
+        {
+            return null;
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to