http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/TransportResultFactory.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/TransportResultFactory.java
 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/TransportResultFactory.java
deleted file mode 100644
index c4b79b4..0000000
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/TransportResultFactory.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine;
-
-import static org.apache.qpid.proton.engine.TransportResult.Status.ERROR;
-import static org.apache.qpid.proton.engine.TransportResult.Status.OK;
-
-import java.util.IllegalFormatException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-
-/**
- * Creates TransportResults.
- * Only intended for use by internal Proton classes.
- * This class resides in the api module so it can be used by both 
proton-j-impl and proton-jni.
- */
-public class TransportResultFactory
-{
-    private static final Logger LOGGER = 
Logger.getLogger(TransportResultFactory.class.getName());
-
-    private static final TransportResult _okResult = new 
TransportResultImpl(OK, null, null);
-
-    public static TransportResult ok()
-    {
-        return _okResult;
-    }
-
-    public static TransportResult error(String format, Object... args)
-    {
-        String errorDescription;
-        try
-        {
-            errorDescription = String.format(format, args);
-        }
-        catch(IllegalFormatException e)
-        {
-            LOGGER.log(Level.SEVERE, "Formating error in string " + format, e);
-            errorDescription = format;
-        }
-        return new TransportResultImpl(ERROR, errorDescription, null);
-    }
-
-    public static TransportResult error(final String errorDescription)
-    {
-        return new TransportResultImpl(ERROR, errorDescription, null);
-    }
-
-    public static TransportResult error(final Exception e)
-    {
-        return new TransportResultImpl(ERROR, e == null ? null : e.toString(), 
e);
-    }
-
-    private static final class TransportResultImpl implements TransportResult
-    {
-        private final String _errorDescription;
-        private final Status _status;
-        private final Exception _exception;
-
-        private TransportResultImpl(Status status, String errorDescription, 
Exception exception)
-        {
-            _status = status;
-            _errorDescription = errorDescription;
-            _exception = exception;
-        }
-
-        @Override
-        public boolean isOk()
-        {
-            return _status == OK;
-        }
-
-        @Override
-        public Status getStatus()
-        {
-            return _status;
-        }
-
-        @Override
-        public String getErrorDescription()
-        {
-            return _errorDescription;
-        }
-
-        @Override
-        public Exception getException()
-        {
-            return _exception;
-        }
-
-        @Override
-        public void checkIsOk()
-        {
-            if (!isOk())
-            {
-                Exception e = getException();
-                if (e != null)
-                {
-                    throw new TransportException(e);
-                }
-                else
-                {
-                    throw new TransportException(getErrorDescription());
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/AmqpErrorException.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/AmqpErrorException.java
 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/AmqpErrorException.java
deleted file mode 100644
index b1d52ed..0000000
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/AmqpErrorException.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-public class AmqpErrorException extends Exception
-{
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/AmqpHeader.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/AmqpHeader.java 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/AmqpHeader.java
deleted file mode 100644
index 41f7e30..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/AmqpHeader.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.proton.engine.impl;
-
-public interface AmqpHeader
-{
-    public static final byte[] HEADER = new byte[]
-            { 'A', 'M', 'Q', 'P', 0, 1, 0, 0 };
-
-    public static final byte[] SASL_HEADER = new byte[]
-            { 'A', 'M', 'Q', 'P', 3, 1, 0, 0 };
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ByteBufferUtils.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ByteBufferUtils.java
 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ByteBufferUtils.java
deleted file mode 100644
index 8616bee..0000000
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ByteBufferUtils.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-import java.nio.ByteBuffer;
-
-import org.apache.qpid.proton.engine.Transport;
-import org.apache.qpid.proton.engine.TransportException;
-
-public class ByteBufferUtils
-{
-    /**
-     * @return number of bytes poured
-     */
-    public static int pour(ByteBuffer source, ByteBuffer destination)
-    {
-        int numberOfBytesToPour = Math.min(source.remaining(), 
destination.remaining());
-        ByteBuffer sourceSubBuffer = source.duplicate();
-        sourceSubBuffer.limit(sourceSubBuffer.position() + 
numberOfBytesToPour);
-        destination.put(sourceSubBuffer);
-        source.position(source.position() + numberOfBytesToPour);
-        return numberOfBytesToPour;
-    }
-
-    /**
-     * Assumes {@code destination} is ready to be written.
-     *
-     * @return number of bytes poured which may be fewer than {@code 
sizeRequested} if
-     * {@code destination} has insufficient remaining
-     */
-    public static int pourArrayToBuffer(byte[] source, int offset, int 
sizeRequested, ByteBuffer destination)
-    {
-        int numberToWrite = Math.min(destination.remaining(), sizeRequested);
-        destination.put(source, offset, numberToWrite);
-        return numberToWrite;
-    }
-
-    /**
-     * Pours the contents of {@code source} into {@code 
destinationTransportInput}, calling
-     * the TransportInput many times if necessary.  If the TransportInput 
returns a {@link org.apache.qpid.proton.engine.TransportResult}
-     * other than ok, data may remain in source.
-     */
-    public static int pourAll(ByteBuffer source, TransportInput 
destinationTransportInput) throws TransportException
-    {
-        int capacity = destinationTransportInput.capacity();
-        if (capacity == Transport.END_OF_STREAM)
-        {
-            if (source.hasRemaining()) {
-                throw new IllegalStateException("Destination has reached end 
of stream: " +
-                                                destinationTransportInput);
-            } else {
-                return Transport.END_OF_STREAM;
-            }
-        }
-
-        int total = source.remaining();
-
-        while(source.hasRemaining() && destinationTransportInput.capacity() > 
0)
-        {
-            pour(source, destinationTransportInput.tail());
-            destinationTransportInput.process();
-        }
-
-        return total - source.remaining();
-    }
-
-    /**
-     * Assumes {@code source} is ready to be read.
-     *
-     * @return number of bytes poured which may be fewer than {@code 
sizeRequested} if
-     * {@code source} has insufficient remaining
-     */
-    public static int pourBufferToArray(ByteBuffer source, byte[] destination, 
int offset, int sizeRequested)
-    {
-        int numberToRead = Math.min(source.remaining(), sizeRequested);
-        source.get(destination, offset, numberToRead);
-        return numberToRead;
-    }
-
-    public static ByteBuffer newWriteableBuffer(int capacity)
-    {
-        ByteBuffer newBuffer = ByteBuffer.allocate(capacity);
-        return newBuffer;
-    }
-
-    public static ByteBuffer newReadableBuffer(int capacity)
-    {
-        ByteBuffer newBuffer = ByteBuffer.allocate(capacity);
-        newBuffer.flip();
-        return newBuffer;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java
deleted file mode 100644
index b4d3925..0000000
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-import org.apache.qpid.proton.engine.Collector;
-import org.apache.qpid.proton.engine.Event;
-import org.apache.qpid.proton.engine.EventType;
-
-
-/**
- * CollectorImpl
- *
- */
-
-public class CollectorImpl implements Collector
-{
-
-    private EventImpl head;
-    private EventImpl tail;
-    private EventImpl free;
-
-    public CollectorImpl()
-    {}
-
-    @Override
-    public Event peek()
-    {
-        return head;
-    }
-
-    @Override
-    public void pop()
-    {
-        if (head != null) {
-            EventImpl next = head.next;
-            head.next = free;
-            free = head;
-            head.clear();
-            head = next;
-        }
-    }
-
-    public EventImpl put(EventType type, Object context)
-    {
-        if (type == null) {
-            throw new IllegalArgumentException("Type cannot be null");
-        }
-        if (!type.isValid()) {
-            throw new IllegalArgumentException("Cannot put events of type " + 
type);
-        }
-        if (tail != null && tail.getEventType() == type &&
-            tail.getContext() == context) {
-            return null;
-        }
-
-        EventImpl event;
-        if (free == null) {
-            event = new EventImpl();
-        } else {
-            event = free;
-            free = free.next;
-            event.next = null;
-        }
-
-        event.init(type, context);
-
-        if (head == null) {
-            head = event;
-            tail = event;
-        } else {
-            tail.next = event;
-            tail = event;
-        }
-
-        return event;
-    }
-
-    @Override
-    public boolean more() {
-        return head != null && head.next != null;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
deleted file mode 100644
index 2878a39..0000000
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
+++ /dev/null
@@ -1,670 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.transport.Open;
-import org.apache.qpid.proton.engine.Collector;
-import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.Event;
-import org.apache.qpid.proton.engine.Link;
-import org.apache.qpid.proton.engine.ProtonJConnection;
-import org.apache.qpid.proton.engine.Session;
-import org.apache.qpid.proton.reactor.Reactor;
-
-public class ConnectionImpl extends EndpointImpl implements ProtonJConnection
-{
-    public static final int MAX_CHANNELS = 65535;
-
-    private List<SessionImpl> _sessions = new ArrayList<SessionImpl>();
-    private EndpointImpl _transportTail;
-    private EndpointImpl _transportHead;
-    private int _maxChannels = MAX_CHANNELS;
-
-    private LinkNode<SessionImpl> _sessionHead;
-    private LinkNode<SessionImpl> _sessionTail;
-
-
-    private LinkNode<LinkImpl> _linkHead;
-    private LinkNode<LinkImpl> _linkTail;
-
-
-    private DeliveryImpl _workHead;
-    private DeliveryImpl _workTail;
-
-    private TransportImpl _transport;
-    private DeliveryImpl _transportWorkHead;
-    private DeliveryImpl _transportWorkTail;
-    private int _transportWorkSize = 0;
-    private String _localContainerId = "";
-    private String _localHostname;
-    private String _remoteContainer;
-    private String _remoteHostname;
-    private Symbol[] _offeredCapabilities;
-    private Symbol[] _desiredCapabilities;
-    private Symbol[] _remoteOfferedCapabilities;
-    private Symbol[] _remoteDesiredCapabilities;
-    private Map<Symbol, Object> _properties;
-    private Map<Symbol, Object> _remoteProperties;
-
-    private Object _context;
-    private CollectorImpl _collector;
-    private Reactor _reactor;
-
-    private static final Symbol[] EMPTY_SYMBOL_ARRAY = new Symbol[0];
-
-    /**
-     * @deprecated This constructor's visibility will be reduced to the 
default scope in a future release.
-     * Client code outside this module should use {@link 
org.apache.qpid.proton.engine.Connection.Factory#create()} instead.
-     */
-    @Deprecated public ConnectionImpl()
-    {
-    }
-
-    @Override
-    public SessionImpl session()
-    {
-        SessionImpl session = new SessionImpl(this);
-        _sessions.add(session);
-
-
-        return session;
-    }
-
-    void freeSession(SessionImpl session)
-    {
-        _sessions.remove(session);
-    }
-
-    protected LinkNode<SessionImpl> addSessionEndpoint(SessionImpl endpoint)
-    {
-        LinkNode<SessionImpl> node;
-        if(_sessionHead == null)
-        {
-            node = _sessionHead = _sessionTail = LinkNode.newList(endpoint);
-        }
-        else
-        {
-            node = _sessionTail = _sessionTail.addAtTail(endpoint);
-        }
-        return node;
-    }
-
-    void removeSessionEndpoint(LinkNode<SessionImpl> node)
-    {
-        LinkNode<SessionImpl> prev = node.getPrev();
-        LinkNode<SessionImpl> next = node.getNext();
-
-        if(_sessionHead == node)
-        {
-            _sessionHead = next;
-        }
-        if(_sessionTail == node)
-        {
-            _sessionTail = prev;
-        }
-        node.remove();
-    }
-
-
-    protected LinkNode<LinkImpl> addLinkEndpoint(LinkImpl endpoint)
-    {
-        LinkNode<LinkImpl> node;
-        if(_linkHead == null)
-        {
-            node = _linkHead = _linkTail = LinkNode.newList(endpoint);
-        }
-        else
-        {
-            node = _linkTail = _linkTail.addAtTail(endpoint);
-        }
-        return node;
-    }
-
-
-    void removeLinkEndpoint(LinkNode<LinkImpl> node)
-    {
-        LinkNode<LinkImpl> prev = node.getPrev();
-        LinkNode<LinkImpl> next = node.getNext();
-
-        if(_linkHead == node)
-        {
-            _linkHead = next;
-        }
-        if(_linkTail == node)
-        {
-            _linkTail = prev;
-        }
-        node.remove();
-    }
-
-
-    @Override
-    public Session sessionHead(final EnumSet<EndpointState> local, final 
EnumSet<EndpointState> remote)
-    {
-        if(_sessionHead == null)
-        {
-            return null;
-        }
-        else
-        {
-            LinkNode.Query<SessionImpl> query = new 
EndpointImplQuery<SessionImpl>(local, remote);
-            LinkNode<SessionImpl> node = query.matches(_sessionHead) ? 
_sessionHead : _sessionHead.next(query);
-            return node == null ? null : node.getValue();
-        }
-    }
-
-    @Override
-    public Link linkHead(EnumSet<EndpointState> local, EnumSet<EndpointState> 
remote)
-    {
-        if(_linkHead == null)
-        {
-            return null;
-        }
-        else
-        {
-            LinkNode.Query<LinkImpl> query = new 
EndpointImplQuery<LinkImpl>(local, remote);
-            LinkNode<LinkImpl> node = query.matches(_linkHead) ? _linkHead : 
_linkHead.next(query);
-            return node == null ? null : node.getValue();
-        }
-    }
-
-    @Override
-    protected ConnectionImpl getConnectionImpl()
-    {
-        return this;
-    }
-
-    @Override
-    void postFinal() {
-        put(Event.Type.CONNECTION_FINAL, this);
-    }
-
-    @Override
-    void doFree() {
-        List<SessionImpl> sessions = new ArrayList<SessionImpl>(_sessions);
-        for(Session session : sessions) {
-            session.free();
-        }
-        _sessions = null;
-    }
-
-    void modifyEndpoints() {
-        if (_sessions != null) {
-            for (SessionImpl ssn: _sessions) {
-                ssn.modifyEndpoints();
-            }
-        }
-        if (!freed) {
-            modified();
-        }
-    }
-
-    void handleOpen(Open open)
-    {
-        // TODO - store state
-        setRemoteState(EndpointState.ACTIVE);
-        setRemoteHostname(open.getHostname());
-        setRemoteContainer(open.getContainerId());
-        setRemoteDesiredCapabilities(open.getDesiredCapabilities());
-        setRemoteOfferedCapabilities(open.getOfferedCapabilities());
-        setRemoteProperties(open.getProperties());
-        put(Event.Type.CONNECTION_REMOTE_OPEN, this);
-    }
-
-
-    EndpointImpl getTransportHead()
-    {
-        return _transportHead;
-    }
-
-    EndpointImpl getTransportTail()
-    {
-        return _transportTail;
-    }
-
-    void addModified(EndpointImpl endpoint)
-    {
-        if(_transportTail == null)
-        {
-            endpoint.setTransportNext(null);
-            endpoint.setTransportPrev(null);
-            _transportHead = _transportTail = endpoint;
-        }
-        else
-        {
-            _transportTail.setTransportNext(endpoint);
-            endpoint.setTransportPrev(_transportTail);
-            _transportTail = endpoint;
-            _transportTail.setTransportNext(null);
-        }
-    }
-
-    void removeModified(EndpointImpl endpoint)
-    {
-        if(_transportHead == endpoint)
-        {
-            _transportHead = endpoint.transportNext();
-        }
-        else
-        {
-            
endpoint.transportPrev().setTransportNext(endpoint.transportNext());
-        }
-
-        if(_transportTail == endpoint)
-        {
-            _transportTail = endpoint.transportPrev();
-        }
-        else
-        {
-            
endpoint.transportNext().setTransportPrev(endpoint.transportPrev());
-        }
-    }
-
-    @Override
-    public int getMaxChannels()
-    {
-        return _maxChannels;
-    }
-
-    public String getLocalContainerId()
-    {
-        return _localContainerId;
-    }
-
-    @Override
-    public void setLocalContainerId(String localContainerId)
-    {
-        _localContainerId = localContainerId;
-    }
-
-    @Override
-    public DeliveryImpl getWorkHead()
-    {
-        return _workHead;
-    }
-
-    @Override
-    public void setContainer(String container)
-    {
-        _localContainerId = container;
-    }
-
-    @Override
-    public String getContainer()
-    {
-        return _localContainerId;
-    }
-
-    @Override
-    public void setHostname(String hostname)
-    {
-        _localHostname = hostname;
-    }
-
-    @Override
-    public String getRemoteContainer()
-    {
-        return _remoteContainer;
-    }
-
-    @Override
-    public String getRemoteHostname()
-    {
-        return _remoteHostname;
-    }
-
-    @Override
-    public void setOfferedCapabilities(Symbol[] capabilities)
-    {
-        _offeredCapabilities = capabilities;
-    }
-
-    @Override
-    public void setDesiredCapabilities(Symbol[] capabilities)
-    {
-        _desiredCapabilities = capabilities;
-    }
-
-    @Override
-    public Symbol[] getRemoteOfferedCapabilities()
-    {
-        return _remoteOfferedCapabilities == null ? EMPTY_SYMBOL_ARRAY : 
_remoteOfferedCapabilities;
-    }
-
-    @Override
-    public Symbol[] getRemoteDesiredCapabilities()
-    {
-        return _remoteDesiredCapabilities == null ? EMPTY_SYMBOL_ARRAY : 
_remoteDesiredCapabilities;
-    }
-
-
-    Symbol[] getOfferedCapabilities()
-    {
-        return _offeredCapabilities;
-    }
-
-    Symbol[] getDesiredCapabilities()
-    {
-        return _desiredCapabilities;
-    }
-
-    void setRemoteOfferedCapabilities(Symbol[] remoteOfferedCapabilities)
-    {
-        _remoteOfferedCapabilities = remoteOfferedCapabilities;
-    }
-
-    void setRemoteDesiredCapabilities(Symbol[] remoteDesiredCapabilities)
-    {
-        _remoteDesiredCapabilities = remoteDesiredCapabilities;
-    }
-
-
-    Map<Symbol, Object> getProperties()
-    {
-        return _properties;
-    }
-
-    @Override
-    public void setProperties(Map<Symbol, Object> properties)
-    {
-        _properties = properties;
-    }
-
-    @Override
-    public Map<Symbol, Object> getRemoteProperties()
-    {
-        return _remoteProperties;
-    }
-
-    void setRemoteProperties(Map<Symbol, Object> remoteProperties)
-    {
-        _remoteProperties = remoteProperties;
-    }
-
-    @Override
-    public String getHostname()
-    {
-        return _localHostname;
-    }
-
-    void setRemoteContainer(String remoteContainerId)
-    {
-        _remoteContainer = remoteContainerId;
-    }
-
-    void setRemoteHostname(String remoteHostname)
-    {
-        _remoteHostname = remoteHostname;
-    }
-
-    DeliveryImpl getWorkTail()
-    {
-        return _workTail;
-    }
-
-    void removeWork(DeliveryImpl delivery)
-    {
-        if (!delivery._work) return;
-
-        DeliveryImpl next = delivery.getWorkNext();
-        DeliveryImpl prev = delivery.getWorkPrev();
-
-        if (prev != null) {
-            prev.setWorkNext(next);
-        }
-
-        if (next != null) {
-            next.setWorkPrev(prev);
-        }
-
-
-        if(_workHead == delivery)
-        {
-            _workHead = next;
-
-        }
-
-        if(_workTail == delivery)
-        {
-            _workTail = prev;
-        }
-
-        delivery._work = false;
-    }
-
-    void addWork(DeliveryImpl delivery)
-    {
-        if (delivery._work) return;
-
-        delivery.setWorkNext(null);
-        delivery.setWorkPrev(_workTail);
-
-        if (_workTail != null) {
-            _workTail.setWorkNext(delivery);
-        }
-
-        _workTail = delivery;
-
-        if (_workHead == null) {
-            _workHead = delivery;
-        }
-
-        delivery._work = true;
-    }
-
-    public Iterator<DeliveryImpl> getWorkSequence()
-    {
-        return new WorkSequence(_workHead);
-    }
-
-    void setTransport(TransportImpl transport)
-    {
-        _transport = transport;
-    }
-
-    @Override
-    public TransportImpl getTransport()
-    {
-        return _transport;
-    }
-
-    private static class WorkSequence implements Iterator<DeliveryImpl>
-    {
-        private DeliveryImpl _next;
-
-        public WorkSequence(DeliveryImpl workHead)
-        {
-            _next = workHead;
-        }
-
-        @Override
-        public boolean hasNext()
-        {
-            return _next != null;
-        }
-
-        @Override
-        public void remove()
-        {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public DeliveryImpl next()
-        {
-            DeliveryImpl next = _next;
-            if(next != null)
-            {
-                _next = next.getWorkNext();
-            }
-            return next;
-        }
-    }
-
-    DeliveryImpl getTransportWorkHead()
-    {
-        return _transportWorkHead;
-    }
-
-    int getTransportWorkSize() {
-        return _transportWorkSize;
-    }
-
-    public void removeTransportWork(DeliveryImpl delivery)
-    {
-        if (!delivery._transportWork) return;
-
-        DeliveryImpl next = delivery.getTransportWorkNext();
-        DeliveryImpl prev = delivery.getTransportWorkPrev();
-
-        if (prev != null) {
-            prev.setTransportWorkNext(next);
-        }
-
-        if (next != null) {
-            next.setTransportWorkPrev(prev);
-        }
-
-
-        if(_transportWorkHead == delivery)
-        {
-            _transportWorkHead = next;
-
-        }
-
-        if(_transportWorkTail == delivery)
-        {
-            _transportWorkTail = prev;
-        }
-
-        delivery._transportWork = false;
-        _transportWorkSize--;
-    }
-
-    void addTransportWork(DeliveryImpl delivery)
-    {
-        modified();
-        if (delivery._transportWork) return;
-
-        delivery.setTransportWorkNext(null);
-        delivery.setTransportWorkPrev(_transportWorkTail);
-
-        if (_transportWorkTail != null) {
-            _transportWorkTail.setTransportWorkNext(delivery);
-        }
-
-        _transportWorkTail = delivery;
-
-        if (_transportWorkHead == null) {
-            _transportWorkHead = delivery;
-        }
-
-        delivery._transportWork = true;
-        _transportWorkSize++;
-    }
-
-    void workUpdate(DeliveryImpl delivery)
-    {
-        if(delivery != null)
-        {
-            if(!delivery.isSettled() &&
-               (delivery.isReadable() ||
-                delivery.isWritable() ||
-                delivery.isUpdated()))
-            {
-                addWork(delivery);
-            }
-            else
-            {
-                removeWork(delivery);
-            }
-        }
-    }
-
-    @Override
-    public Object getContext()
-    {
-        return _context;
-    }
-
-    @Override
-    public void setContext(Object context)
-    {
-        _context = context;
-    }
-
-    @Override
-    public void collect(Collector collector)
-    {
-        _collector = (CollectorImpl) collector;
-
-        put(Event.Type.CONNECTION_INIT, this);
-
-        LinkNode<SessionImpl> ssn = _sessionHead;
-        while (ssn != null) {
-            put(Event.Type.SESSION_INIT, ssn.getValue());
-            ssn = ssn.getNext();
-        }
-
-        LinkNode<LinkImpl> lnk = _linkHead;
-        while (lnk != null) {
-            put(Event.Type.LINK_INIT, lnk.getValue());
-            lnk = lnk.getNext();
-        }
-    }
-
-    EventImpl put(Event.Type type, Object context)
-    {
-        if (_collector != null) {
-            return _collector.put(type, context);
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    void localOpen()
-    {
-        put(Event.Type.CONNECTION_LOCAL_OPEN, this);
-    }
-
-    @Override
-    void localClose()
-    {
-        put(Event.Type.CONNECTION_LOCAL_CLOSE, this);
-    }
-
-    @Override
-    public Reactor getReactor() {
-        return _reactor;
-    }
-
-    public void setReactor(Reactor reactor) {
-        _reactor = reactor;
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
deleted file mode 100644
index 0bdb163..0000000
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
+++ /dev/null
@@ -1,522 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-import java.util.Arrays;
-
-import org.apache.qpid.proton.amqp.transport.DeliveryState;
-import org.apache.qpid.proton.codec.ReadableBuffer;
-import org.apache.qpid.proton.codec.WritableBuffer;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Record;
-import org.apache.qpid.proton.engine.Transport;
-
-public class DeliveryImpl implements Delivery
-{
-    public static final int DEFAULT_MESSAGE_FORMAT = 0;
-
-    private DeliveryImpl _linkPrevious;
-    private DeliveryImpl _linkNext;
-
-    private DeliveryImpl _workNext;
-    private DeliveryImpl _workPrev;
-    boolean _work;
-
-    private DeliveryImpl _transportWorkNext;
-    private DeliveryImpl _transportWorkPrev;
-    boolean _transportWork;
-
-    private Record _attachments;
-    private Object _context;
-
-    private final byte[] _tag;
-    private final LinkImpl _link;
-    private DeliveryState _deliveryState;
-    private boolean _settled;
-    private boolean _remoteSettled;
-    private DeliveryState _remoteDeliveryState;
-    private DeliveryState _defaultDeliveryState = null;
-    private int _messageFormat = DEFAULT_MESSAGE_FORMAT;
-
-    /**
-     * A bit-mask representing the outstanding work on this delivery received 
from the transport layer
-     * that has not yet been processed by the application.
-     */
-    private int _flags = (byte) 0;
-
-    private TransportDelivery _transportDelivery;
-    private byte[] _data;
-    private int _dataSize;
-    private boolean _complete;
-    private boolean _updated;
-    private boolean _done;
-    private int _offset;
-
-    DeliveryImpl(final byte[] tag, final LinkImpl link, DeliveryImpl previous)
-    {
-        _tag = tag;
-        _link = link;
-        _link.incrementUnsettled();
-        _linkPrevious = previous;
-        if(previous != null)
-        {
-            previous._linkNext = this;
-        }
-    }
-
-    @Override
-    public byte[] getTag()
-    {
-        return _tag;
-    }
-
-    @Override
-    public LinkImpl getLink()
-    {
-        return _link;
-    }
-
-    @Override
-    public DeliveryState getLocalState()
-    {
-        return _deliveryState;
-    }
-
-    @Override
-    public DeliveryState getRemoteState()
-    {
-        return _remoteDeliveryState;
-    }
-
-    @Override
-    public boolean remotelySettled()
-    {
-        return _remoteSettled;
-    }
-
-    @Override
-    public void setMessageFormat(int messageFormat)
-    {
-        _messageFormat = messageFormat;
-    }
-
-    @Override
-    public int getMessageFormat()
-    {
-        return _messageFormat;
-    }
-
-    @Override
-    public void disposition(final DeliveryState state)
-    {
-        _deliveryState = state;
-        if(!_remoteSettled)
-        {
-            addToTransportWorkList();
-        }
-    }
-
-    @Override
-    public void settle()
-    {
-        if (_settled) {
-            return;
-        }
-
-        _settled = true;
-        _link.decrementUnsettled();
-        if(!_remoteSettled)
-        {
-            addToTransportWorkList();
-        }
-        else
-        {
-            _transportDelivery.settled();
-        }
-        if(_link.current() == this)
-        {
-            _link.advance();
-        }
-
-        _link.remove(this);
-        if(_linkPrevious != null)
-        {
-            _linkPrevious._linkNext = _linkNext;
-        }
-        if(_linkNext != null)
-        {
-            _linkNext._linkPrevious = _linkPrevious;
-        }
-        updateWork();
-    }
-
-    DeliveryImpl getLinkNext()
-    {
-        return _linkNext;
-    }
-
-    @Override
-    public DeliveryImpl next()
-    {
-        return getLinkNext();
-    }
-
-    @Override
-    public void free()
-    {
-        settle();
-    }
-
-    DeliveryImpl getLinkPrevious()
-    {
-        return _linkPrevious;
-    }
-
-    @Override
-    public DeliveryImpl getWorkNext()
-    {
-        if (_workNext != null)
-            return _workNext;
-        // the following hack is brought to you by the C implementation!
-        if (!_work)  // not on the work list
-            return _link.getConnectionImpl().getWorkHead();
-        return null;
-    }
-
-    DeliveryImpl getWorkPrev()
-    {
-        return _workPrev;
-    }
-
-
-    void setWorkNext(DeliveryImpl workNext)
-    {
-        _workNext = workNext;
-    }
-
-    void setWorkPrev(DeliveryImpl workPrev)
-    {
-        _workPrev = workPrev;
-    }
-
-    int recv(final byte[] bytes, int offset, int size)
-    {
-        final int consumed;
-        if (_data != null)
-        {
-            //TODO - should only be if no bytes left
-            consumed = Math.min(size, _dataSize);
-
-            System.arraycopy(_data, _offset, bytes, offset, consumed);
-            _offset += consumed;
-            _dataSize -= consumed;
-        }
-        else
-        {
-            _dataSize = consumed = 0;
-        }
-
-        return (_complete && consumed == 0) ? Transport.END_OF_STREAM : 
consumed;  //TODO - Implement
-    }
-
-    int recv(final WritableBuffer buffer) {
-        final int consumed;
-        if (_data != null)
-        {
-            consumed = Math.min(buffer.remaining(), _dataSize);
-
-            buffer.put(_data, _offset, consumed);
-            _offset += consumed;
-            _dataSize -= consumed;
-        }
-        else
-        {
-            _dataSize = consumed = 0;
-        }
-
-        return (_complete && consumed == 0) ? Transport.END_OF_STREAM : 
consumed;
-    }
-
-    void updateWork()
-    {
-        getLink().getConnectionImpl().workUpdate(this);
-    }
-
-    DeliveryImpl clearTransportWork()
-    {
-        DeliveryImpl next = _transportWorkNext;
-        getLink().getConnectionImpl().removeTransportWork(this);
-        return next;
-    }
-
-    void addToTransportWorkList()
-    {
-        getLink().getConnectionImpl().addTransportWork(this);
-    }
-
-
-    DeliveryImpl getTransportWorkNext()
-    {
-        return _transportWorkNext;
-    }
-
-
-    DeliveryImpl getTransportWorkPrev()
-    {
-        return _transportWorkPrev;
-    }
-
-    void setTransportWorkNext(DeliveryImpl transportWorkNext)
-    {
-        _transportWorkNext = transportWorkNext;
-    }
-
-    void setTransportWorkPrev(DeliveryImpl transportWorkPrev)
-    {
-        _transportWorkPrev = transportWorkPrev;
-    }
-
-    TransportDelivery getTransportDelivery()
-    {
-        return _transportDelivery;
-    }
-
-    void setTransportDelivery(TransportDelivery transportDelivery)
-    {
-        _transportDelivery = transportDelivery;
-    }
-
-    @Override
-    public boolean isSettled()
-    {
-        return _settled;
-    }
-
-    int send(byte[] bytes, int offset, int length)
-    {
-        if(_data == null)
-        {
-            _data = new byte[length];
-        }
-        else if(_data.length - _dataSize < length)
-        {
-            byte[] oldData = _data;
-            _data = new byte[oldData.length + _dataSize];
-            System.arraycopy(oldData, _offset, _data, 0, _dataSize);
-            _offset = 0;
-        }
-        System.arraycopy(bytes, offset, _data, _dataSize + _offset, length);
-        _dataSize += length;
-        addToTransportWorkList();
-        return length;  //TODO - Implement.
-    }
-
-    int send(final ReadableBuffer buffer)
-    {
-        int length = buffer.remaining();
-
-        if(_data == null)
-        {
-            _data = new byte[length];
-        }
-        else if(_data.length - _dataSize < length)
-        {
-            byte[] oldData = _data;
-            _data = new byte[oldData.length + _dataSize];
-            System.arraycopy(oldData, _offset, _data, 0, _dataSize);
-            _offset = 0;
-        }
-        buffer.get(_data, _offset, length);
-        _dataSize+=length;
-        addToTransportWorkList();
-        return length;
-    }
-
-    byte[] getData()
-    {
-        return _data;
-    }
-
-    int getDataOffset()
-    {
-        return _offset;
-    }
-
-    int getDataLength()
-    {
-        return _dataSize;  //TODO - Implement.
-    }
-
-    void setData(byte[] data)
-    {
-        _data = data;
-    }
-
-    void setDataLength(int length)
-    {
-        _dataSize = length;
-    }
-
-    public void setDataOffset(int arrayOffset)
-    {
-        _offset = arrayOffset;
-    }
-
-    @Override
-    public boolean isWritable()
-    {
-        return getLink() instanceof SenderImpl
-                && getLink().current() == this
-                && ((SenderImpl) getLink()).hasCredit();
-    }
-
-    @Override
-    public boolean isReadable()
-    {
-        return getLink() instanceof ReceiverImpl
-            && getLink().current() == this;
-    }
-
-    void setComplete()
-    {
-        _complete = true;
-    }
-
-    @Override
-    public boolean isPartial()
-    {
-        return !_complete;
-    }
-
-    void setRemoteDeliveryState(DeliveryState remoteDeliveryState)
-    {
-        _remoteDeliveryState = remoteDeliveryState;
-        _updated = true;
-    }
-
-    @Override
-    public boolean isUpdated()
-    {
-        return _updated;
-    }
-
-    @Override
-    public void clear()
-    {
-        _updated = false;
-        getLink().getConnectionImpl().workUpdate(this);
-    }
-
-
-    void setDone()
-    {
-        _done = true;
-    }
-
-    boolean isDone()
-    {
-        return _done;
-    }
-
-    void setRemoteSettled(boolean remoteSettled)
-    {
-        _remoteSettled = remoteSettled;
-        _updated = true;
-    }
-
-    @Override
-    public boolean isBuffered()
-    {
-        if (_remoteSettled) return false;
-        if (getLink() instanceof SenderImpl) {
-            if (isDone()) {
-                return false;
-            } else {
-                return _complete || _dataSize > 0;
-            }
-        } else {
-            return false;
-        }
-    }
-
-    @Override
-    public Object getContext()
-    {
-        return _context;
-    }
-
-    @Override
-    public void setContext(Object context)
-    {
-        _context = context;
-    }
-
-    @Override
-    public Record attachments()
-    {
-        if(_attachments == null)
-        {
-            _attachments = new RecordImpl();
-        }
-
-        return _attachments;
-    }
-
-    @Override
-    public String toString()
-    {
-        StringBuilder builder = new StringBuilder();
-        builder.append("DeliveryImpl [_tag=").append(Arrays.toString(_tag))
-            .append(", _link=").append(_link)
-            .append(", _deliveryState=").append(_deliveryState)
-            .append(", _settled=").append(_settled)
-            .append(", _remoteSettled=").append(_remoteSettled)
-            .append(", _remoteDeliveryState=").append(_remoteDeliveryState)
-            .append(", _flags=").append(_flags)
-            .append(", _defaultDeliveryState=").append(_defaultDeliveryState)
-            .append(", _transportDelivery=").append(_transportDelivery)
-            .append(", _dataSize=").append(_dataSize)
-            .append(", _complete=").append(_complete)
-            .append(", _updated=").append(_updated)
-            .append(", _done=").append(_done)
-            .append(", _offset=").append(_offset).append("]");
-        return builder.toString();
-    }
-
-    @Override
-    public int pending()
-    {
-        return _dataSize;
-    }
-
-    @Override
-    public void setDefaultDeliveryState(DeliveryState state)
-    {
-        _defaultDeliveryState = state;
-    }
-
-    @Override
-    public DeliveryState getDefaultDeliveryState()
-    {
-        return _defaultDeliveryState;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java
deleted file mode 100644
index bbcc9d9..0000000
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.engine.impl;
-
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.Event;
-import org.apache.qpid.proton.engine.ProtonJEndpoint;
-import org.apache.qpid.proton.engine.Record;
-
-public abstract class EndpointImpl implements ProtonJEndpoint
-{
-    private EndpointState _localState = EndpointState.UNINITIALIZED;
-    private EndpointState _remoteState = EndpointState.UNINITIALIZED;
-    private ErrorCondition _localError = new ErrorCondition();
-    private ErrorCondition _remoteError = new ErrorCondition();
-    private boolean _modified;
-    private EndpointImpl _transportNext;
-    private EndpointImpl _transportPrev;
-    private Object _context;
-    private Record _attachments = new RecordImpl();
-
-    private int refcount = 1;
-    boolean freed = false;
-
-    void incref() {
-        refcount++;
-    }
-
-    void decref() {
-        refcount--;
-        if (refcount == 0) {
-            postFinal();
-        } else if (refcount < 0) {
-            throw new IllegalStateException();
-        }
-    }
-
-    abstract void postFinal();
-
-    abstract void localOpen();
-
-    abstract void localClose();
-
-    @Override
-    public void open()
-    {
-        if (getLocalState() != EndpointState.ACTIVE)
-        {
-            _localState = EndpointState.ACTIVE;
-            localOpen();
-            modified();
-        }
-    }
-
-    @Override
-    public void close()
-    {
-        if (getLocalState() != EndpointState.CLOSED)
-        {
-            _localState = EndpointState.CLOSED;
-            localClose();
-            modified();
-        }
-    }
-
-    @Override
-    public EndpointState getLocalState()
-    {
-        return _localState;
-    }
-
-    @Override
-    public EndpointState getRemoteState()
-    {
-        return _remoteState;
-    }
-
-    @Override
-    public ErrorCondition getCondition()
-    {
-        return _localError;
-    }
-
-    @Override
-    public void setCondition(ErrorCondition condition)
-    {
-        if(condition != null)
-        {
-            _localError.copyFrom(condition);
-        }
-        else
-        {
-            _localError.clear();
-        }
-    }
-
-    @Override
-    public ErrorCondition getRemoteCondition()
-    {
-        return _remoteError;
-    }
-
-    void setLocalState(EndpointState localState)
-    {
-        _localState = localState;
-    }
-
-    void setRemoteState(EndpointState remoteState)
-    {
-        // TODO - check state change legal
-        _remoteState = remoteState;
-    }
-
-    void modified()
-    {
-        modified(true);
-    }
-
-    void modified(boolean emit)
-    {
-        if(!_modified)
-        {
-            _modified = true;
-            getConnectionImpl().addModified(this);
-        }
-
-        if (emit) {
-            ConnectionImpl conn = getConnectionImpl();
-            TransportImpl trans = conn.getTransport();
-            if (trans != null) {
-                conn.put(Event.Type.TRANSPORT, trans);
-            }
-        }
-    }
-
-    protected abstract ConnectionImpl getConnectionImpl();
-
-    void clearModified()
-    {
-        if(_modified)
-        {
-            _modified = false;
-            getConnectionImpl().removeModified(this);
-        }
-    }
-
-    boolean isModified()
-    {
-        return _modified;
-    }
-
-    EndpointImpl transportNext()
-    {
-        return _transportNext;
-    }
-
-    EndpointImpl transportPrev()
-    {
-        return _transportPrev;
-    }
-
-    abstract void doFree();
-
-    @Override
-    final public void free()
-    {
-        if (freed) return;
-        freed = true;
-
-        doFree();
-        decref();
-    }
-
-    void setTransportNext(EndpointImpl transportNext)
-    {
-        _transportNext = transportNext;
-    }
-
-    void setTransportPrev(EndpointImpl transportPrevious)
-    {
-        _transportPrev = transportPrevious;
-    }
-
-    @Override
-    public Object getContext()
-    {
-        return _context;
-    }
-
-    @Override
-    public void setContext(Object context)
-    {
-        _context = context;
-    }
-
-    @Override
-    public Record attachments()
-    {
-        return _attachments;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImplQuery.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImplQuery.java
 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImplQuery.java
deleted file mode 100644
index 33519b1..0000000
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImplQuery.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package org.apache.qpid.proton.engine.impl;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-
-
-import java.util.EnumSet;
-import org.apache.qpid.proton.engine.EndpointState;
-
-class EndpointImplQuery<T extends EndpointImpl> implements LinkNode.Query<T>
-{
-    private final EnumSet<EndpointState> _local;
-    private final EnumSet<EndpointState> _remote;
-
-    EndpointImplQuery(EnumSet<EndpointState> local, EnumSet<EndpointState> 
remote)
-    {
-        _local = local;
-        _remote = remote;
-    }
-
-    public boolean matches(LinkNode<T> node)
-    {
-        return (_local == null || 
_local.contains(node.getValue().getLocalState()))
-                && (_remote == null || 
_remote.contains(node.getValue().getRemoteState()));
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java
deleted file mode 100644
index 3bcecb5..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java
+++ /dev/null
@@ -1,318 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-import java.util.Iterator;
-
-import org.apache.qpid.proton.engine.Connection;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Event;
-import org.apache.qpid.proton.engine.EventType;
-import org.apache.qpid.proton.engine.Handler;
-import org.apache.qpid.proton.engine.HandlerException;
-import org.apache.qpid.proton.engine.Link;
-import org.apache.qpid.proton.engine.Receiver;
-import org.apache.qpid.proton.engine.Record;
-import org.apache.qpid.proton.engine.Sender;
-import org.apache.qpid.proton.engine.Session;
-import org.apache.qpid.proton.engine.Transport;
-import org.apache.qpid.proton.reactor.Reactor;
-import org.apache.qpid.proton.reactor.Selectable;
-import org.apache.qpid.proton.reactor.Task;
-import org.apache.qpid.proton.reactor.impl.ReactorImpl;
-
-/**
- * EventImpl
- *
- */
-
-class EventImpl implements Event
-{
-
-    EventType type;
-    Object context;
-    EventImpl next;
-    RecordImpl attachments = new RecordImpl();
-
-    EventImpl()
-    {
-        this.type = null;
-    }
-
-    void init(EventType type, Object context)
-    {
-        this.type = type;
-        this.context = context;
-        this.attachments.clear();
-    }
-
-    void clear()
-    {
-        type = null;
-        context = null;
-        attachments.clear();
-    }
-
-    @Override
-    public EventType getEventType()
-    {
-        return type;
-    }
-
-    @Override
-    public Type getType() {
-        if (type instanceof Type) {
-            return (Type)type;
-        }
-        return Type.NON_CORE_EVENT;
-    }
-
-    @Override
-    public Object getContext()
-    {
-        return context;
-    }
-
-    @Override
-    public Handler getRootHandler() {
-        return ReactorImpl.ROOT.get(this);
-    }
-
-    private Handler delegated = null;
-
-    @Override
-    public void dispatch(Handler handler) throws HandlerException
-    {
-        Handler old_delegated = delegated;
-        try {
-            delegated = handler;
-            try {
-                handler.handle(this);
-            } catch(HandlerException handlerException) {
-                throw handlerException;
-            } catch(RuntimeException runtimeException) {
-                throw new HandlerException(handler, runtimeException);
-            }
-            delegate();
-        } finally {
-            delegated = old_delegated;
-        }
-    }
-
-    @Override
-    public void delegate() throws HandlerException
-    {
-        if (delegated == null) {
-            return; // short circuit
-        }
-        Iterator<Handler> children = delegated.children();
-        delegated = null;
-        while(children.hasNext()) {
-            dispatch(children.next());
-        }
-    }
-
-    @Override
-    public void redispatch(EventType as_type, Handler handler) throws 
HandlerException 
-    {
-        if (!as_type.isValid()) {
-            throw new IllegalArgumentException("Can only redispatch valid 
event types");
-        }
-        EventType old = type;
-        try {
-            type = as_type;
-            dispatch(handler);
-        }
-        finally {
-            type = old;
-        }
-    }
-
-    @Override
-    public Connection getConnection()
-    {
-        if (context instanceof Connection) {
-            return (Connection) context;
-        } else if (context instanceof Transport) {
-            Transport transport = getTransport();
-            if (transport == null) {
-                return null;
-            }
-            return ((TransportImpl) transport).getConnectionImpl();
-        } else {
-            Session ssn = getSession();
-            if (ssn == null) {
-                return null;
-            }
-            return ssn.getConnection();
-        }
-    }
-
-    @Override
-    public Session getSession()
-    {
-        if (context instanceof Session) {
-            return (Session) context;
-        } else {
-            Link link = getLink();
-            if (link == null) {
-                return null;
-            }
-            return link.getSession();
-        }
-    }
-
-    @Override
-    public Link getLink()
-    {
-        if (context instanceof Link) {
-            return (Link) context;
-        } else {
-            Delivery dlv = getDelivery();
-            if (dlv == null) {
-                return null;
-            }
-            return dlv.getLink();
-        }
-    }
-
-    @Override
-    public Sender getSender()
-    {
-        if (context instanceof Sender) {
-            return (Sender) context;
-        } else {
-            Link link = getLink();
-            if (link instanceof Sender) {
-                return (Sender) link;
-            }
-            return null;
-        }
-    }
-
-    @Override
-    public Receiver getReceiver()
-    {
-        if (context instanceof Receiver) {
-            return (Receiver) context;
-        } else {
-            Link link = getLink();
-            if (link instanceof Receiver) {
-                return (Receiver) link;
-            }
-            return null;
-        }
-    }
-
-    @Override
-    public Delivery getDelivery()
-    {
-        if (context instanceof Delivery) {
-            return (Delivery) context;
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public Transport getTransport()
-    {
-        if (context instanceof Transport) {
-            return (Transport) context;
-        } else if (context instanceof Connection) {
-            return ((Connection)context).getTransport();
-        } else {
-            Session session = getSession();
-            if (session == null) {
-                return null;
-            }
-
-            Connection connection = session.getConnection();
-            if (connection == null) {
-                return null;
-            }
-
-            return connection.getTransport();
-        }
-    }
-
-    @Override
-    public Selectable getSelectable() {
-        if (context instanceof Selectable) {
-            return (Selectable) context;
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public Reactor getReactor() {
-        if (context instanceof Reactor) {
-            return (Reactor) context;
-        } else if (context instanceof Task) {
-            return ((Task)context).getReactor();
-        } else if (context instanceof Transport) {
-            return ((TransportImpl)context).getReactor();
-        } else if (context instanceof Delivery) {
-            return 
((Delivery)context).getLink().getSession().getConnection().getReactor();
-        } else if (context instanceof Link) {
-            return ((Link)context).getSession().getConnection().getReactor();
-        } else if (context instanceof Session) {
-            return ((Session)context).getConnection().getReactor();
-        } else if (context instanceof Connection) {
-            return ((Connection)context).getReactor();
-        } else if (context instanceof Selectable) {
-            return ((Selectable)context).getReactor();
-        }
-        return null;
-    }
-
-    @Override
-    public Task getTask() {
-        if (context instanceof Task) {
-            return (Task) context;
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public Record attachments() {
-        return attachments;
-    }
-
-    @Override
-    public Event copy()
-    {
-       EventImpl newEvent = new EventImpl();
-       newEvent.init(type, context);
-       newEvent.attachments.copy(attachments);
-       return newEvent;
-    }
-
-    @Override
-    public String toString()
-    {
-        return "EventImpl{" + "type=" + type + ", context=" + context + '}';
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameHandler.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameHandler.java 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameHandler.java
deleted file mode 100644
index dfbb201..0000000
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameHandler.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-import org.apache.qpid.proton.engine.TransportException;
-import org.apache.qpid.proton.framing.TransportFrame;
-
-public interface FrameHandler
-{
-    /**
-     * @throws IllegalStateException if I am not currently accepting input
-     * @see #isHandlingFrames()
-     * @return false on end of stream
-     */
-    boolean handleFrame(TransportFrame frame);
-
-    void closed(TransportException error);
-
-    /**
-     * Returns whether I am currently able to handle frames.
-     * MUST be checked before calling {@link #handleFrame(TransportFrame)}.
-     */
-    boolean isHandlingFrames();
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
deleted file mode 100644
index 6aede84..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
+++ /dev/null
@@ -1,586 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.engine.impl;
-
-import static org.apache.qpid.proton.engine.impl.AmqpHeader.HEADER;
-import static 
org.apache.qpid.proton.engine.impl.ByteBufferUtils.newWriteableBuffer;
-
-import java.nio.ByteBuffer;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.transport.EmptyFrame;
-import org.apache.qpid.proton.amqp.transport.FrameBody;
-import org.apache.qpid.proton.codec.ByteBufferDecoder;
-import org.apache.qpid.proton.codec.DecodeException;
-import org.apache.qpid.proton.engine.Transport;
-import org.apache.qpid.proton.engine.TransportException;
-import org.apache.qpid.proton.framing.TransportFrame;
-
-class FrameParser implements TransportInput
-{
-    private static final Logger TRACE_LOGGER = 
Logger.getLogger("proton.trace");
-
-    private static final ByteBuffer _emptyInputBuffer = newWriteableBuffer(0);
-
-    private enum State
-    {
-        HEADER0,
-        HEADER1,
-        HEADER2,
-        HEADER3,
-        HEADER4,
-        HEADER5,
-        HEADER6,
-        HEADER7,
-        SIZE_0,
-        SIZE_1,
-        SIZE_2,
-        SIZE_3,
-        PRE_PARSE,
-        BUFFERING,
-        PARSING,
-        ERROR
-    }
-
-    private final FrameHandler _frameHandler;
-    private final ByteBufferDecoder _decoder;
-    private final int _inputBufferSize;
-    private final int _localMaxFrameSize;
-
-    private ByteBuffer _inputBuffer = null;
-    private boolean _tail_closed = false;
-
-    private State _state = State.HEADER0;
-
-    private long _framesInput = 0;
-
-    /** the stated size of the current frame */
-    private int _size;
-
-    /** holds the current frame that is being parsed */
-    private ByteBuffer _frameBuffer;
-
-    private TransportFrame _heldFrame;
-    private TransportException _parsingError;
-
-
-    /**
-     * We store the last result when processing input so that
-     * we know not to process any more input if it was an error.
-     */
-    FrameParser(FrameHandler frameHandler, ByteBufferDecoder decoder, int 
localMaxFrameSize)
-    {
-        _frameHandler = frameHandler;
-        _decoder = decoder;
-        _localMaxFrameSize = localMaxFrameSize;
-        _inputBufferSize = _localMaxFrameSize > 0 ? _localMaxFrameSize : 
4*1024;
-    }
-
-    private void input(ByteBuffer in) throws TransportException
-    {
-        flushHeldFrame();
-        if (_heldFrame != null)
-        {
-            return;
-        }
-
-        TransportException frameParsingError = null;
-        int size = _size;
-        State state = _state;
-        ByteBuffer oldIn = null;
-
-        boolean transportAccepting = true;
-
-        while(in.hasRemaining() && state != State.ERROR && transportAccepting)
-        {
-            switch(state)
-            {
-                case HEADER0:
-                    if(in.hasRemaining())
-                    {
-                        byte c = in.get();
-                        if(c != HEADER[0])
-                        {
-                            frameParsingError = new TransportException("AMQP 
header mismatch value %x, expecting %x. In state: %s", c, HEADER[0], state);
-                            state = State.ERROR;
-                            break;
-                        }
-                        state = State.HEADER1;
-                    }
-                    else
-                    {
-                        break;
-                    }
-                case HEADER1:
-                    if(in.hasRemaining())
-                    {
-                        byte c = in.get();
-                        if(c != HEADER[1])
-                        {
-                            frameParsingError = new TransportException("AMQP 
header mismatch value %x, expecting %x. In state: %s", c, HEADER[1], state);
-                            state = State.ERROR;
-                            break;
-                        }
-                        state = State.HEADER2;
-                    }
-                    else
-                    {
-                        break;
-                    }
-                case HEADER2:
-                    if(in.hasRemaining())
-                    {
-                        byte c = in.get();
-                        if(c != HEADER[2])
-                        {
-                            frameParsingError = new TransportException("AMQP 
header mismatch value %x, expecting %x. In state: %s", c, HEADER[2], state);
-                            state = State.ERROR;
-                            break;
-                        }
-                        state = State.HEADER3;
-                    }
-                    else
-                    {
-                        break;
-                    }
-                case HEADER3:
-                    if(in.hasRemaining())
-                    {
-                        byte c = in.get();
-                        if(c != HEADER[3])
-                        {
-                            frameParsingError = new TransportException("AMQP 
header mismatch value %x, expecting %x. In state: %s", c, HEADER[3], state);
-                            state = State.ERROR;
-                            break;
-                        }
-                        state = State.HEADER4;
-                    }
-                    else
-                    {
-                        break;
-                    }
-                case HEADER4:
-                    if(in.hasRemaining())
-                    {
-                        byte c = in.get();
-                        if(c != HEADER[4])
-                        {
-                            frameParsingError = new TransportException("AMQP 
header mismatch value %x, expecting %x. In state: %s", c, HEADER[4], state);
-                            state = State.ERROR;
-                            break;
-                        }
-                        state = State.HEADER5;
-                    }
-                    else
-                    {
-                        break;
-                    }
-                case HEADER5:
-                    if(in.hasRemaining())
-                    {
-                        byte c = in.get();
-                        if(c != HEADER[5])
-                        {
-                            frameParsingError = new TransportException("AMQP 
header mismatch value %x, expecting %x. In state: %s", c, HEADER[5], state);
-                            state = State.ERROR;
-                            break;
-                        }
-                        state = State.HEADER6;
-                    }
-                    else
-                    {
-                        break;
-                    }
-                case HEADER6:
-                    if(in.hasRemaining())
-                    {
-                        byte c = in.get();
-                        if(c != HEADER[6])
-                        {
-                            frameParsingError = new TransportException("AMQP 
header mismatch value %x, expecting %x. In state: %s", c, HEADER[6], state);
-                            state = State.ERROR;
-                            break;
-                        }
-                        state = State.HEADER7;
-                    }
-                    else
-                    {
-                        break;
-                    }
-                case HEADER7:
-                    if(in.hasRemaining())
-                    {
-                        byte c = in.get();
-                        if(c != HEADER[7])
-                        {
-                            frameParsingError = new TransportException("AMQP 
header mismatch value %x, expecting %x. In state: %s", c, HEADER[7], state);
-                            state = State.ERROR;
-                            break;
-                        }
-                        state = State.SIZE_0;
-                    }
-                    else
-                    {
-                        break;
-                    }
-                case SIZE_0:
-                    if(!in.hasRemaining())
-                    {
-                        break;
-                    }
-                    if(in.remaining() >= 4)
-                    {
-                        size = in.getInt();
-                        state = State.PRE_PARSE;
-                        break;
-                    }
-                    else
-                    {
-                        size = (in.get() << 24) & 0xFF000000;
-                        if(!in.hasRemaining())
-                        {
-                            state = State.SIZE_1;
-                            break;
-                        }
-                    }
-                case SIZE_1:
-                    size |= (in.get() << 16) & 0xFF0000;
-                    if(!in.hasRemaining())
-                    {
-                        state = State.SIZE_2;
-                        break;
-                    }
-                case SIZE_2:
-                    size |= (in.get() << 8) & 0xFF00;
-                    if(!in.hasRemaining())
-                    {
-                        state = State.SIZE_3;
-                        break;
-                    }
-                case SIZE_3:
-                    size |= in.get() & 0xFF;
-                    state = State.PRE_PARSE;
-
-                case PRE_PARSE:
-                    if(size < 8)
-                    {
-                        frameParsingError = new TransportException("specified 
frame size %d smaller than minimum frame header "
-                                                         + "size %d",
-                                                         size, 8);
-                        state = State.ERROR;
-                        break;
-                    }
-
-                    if (_localMaxFrameSize > 0 && size > _localMaxFrameSize)
-                    {
-                        frameParsingError = new TransportException("specified 
frame size %d greater than maximum valid frame size %d",
-                                                                   size, 
_localMaxFrameSize);
-                        state = State.ERROR;
-                        break;
-                    }
-
-                    if(in.remaining() < size-4)
-                    {
-                        _frameBuffer = ByteBuffer.allocate(size-4);
-                        _frameBuffer.put(in);
-                        state = State.BUFFERING;
-                        break;
-                    }
-                case BUFFERING:
-                    if(_frameBuffer != null)
-                    {
-                        if(in.remaining() < _frameBuffer.remaining())
-                        {
-                            _frameBuffer.put(in);
-                            break;
-                        }
-                        else
-                        {
-                            ByteBuffer dup = in.duplicate();
-                            dup.limit(dup.position()+_frameBuffer.remaining());
-                            
in.position(in.position()+_frameBuffer.remaining());
-                            _frameBuffer.put(dup);
-                            oldIn = in;
-                            _frameBuffer.flip();
-                            in = _frameBuffer;
-                            state = State.PARSING;
-                        }
-                    }
-
-                case PARSING:
-
-                    int dataOffset = (in.get() << 2) & 0x3FF;
-
-                    if(dataOffset < 8)
-                    {
-                        frameParsingError = new TransportException("specified 
frame data offset %d smaller than minimum frame header size %d", dataOffset, 8);
-                        state = State.ERROR;
-                        break;
-                    }
-                    else if(dataOffset > size)
-                    {
-                        frameParsingError = new TransportException("specified 
frame data offset %d larger than the frame size %d", dataOffset, _size);
-                        state = State.ERROR;
-                        break;
-                    }
-
-                    // type
-
-                    int type = in.get() & 0xFF;
-                    int channel = in.getShort() & 0xFFFF;
-
-                    if(type != 0)
-                    {
-                        frameParsingError = new TransportException("unknown 
frame type: %d", type);
-                        state = State.ERROR;
-                        break;
-                    }
-
-                    // note that this skips over the extended header if it's 
present
-                    if(dataOffset!=8)
-                    {
-                        in.position(in.position()+dataOffset-8);
-                    }
-
-                    // oldIn null iff not working on duplicated buffer
-                    final int frameBodySize = size - dataOffset;
-                    if(oldIn == null)
-                    {
-                        oldIn = in;
-                        in = in.duplicate();
-                        final int endPos = in.position() + frameBodySize;
-                        in.limit(endPos);
-                        oldIn.position(endPos);
-
-                    }
-
-                    try
-                    {
-                        _framesInput += 1;
-
-                        Binary payload = null;
-                        Object val = null;
-
-                        if (frameBodySize > 0)
-                        {
-                            _decoder.setByteBuffer(in);
-                            val = _decoder.readObject();
-                            _decoder.setByteBuffer(null);
-
-                            if(in.hasRemaining())
-                            {
-                                byte[] payloadBytes = new byte[in.remaining()];
-                                in.get(payloadBytes);
-                                payload = new Binary(payloadBytes);
-                            }
-                            else
-                            {
-                                payload = null;
-                            }
-                        }
-                        else
-                        {
-                            val = new EmptyFrame();
-                        }
-
-                        if(val instanceof FrameBody)
-                        {
-                            FrameBody frameBody = (FrameBody) val;
-                            if(TRACE_LOGGER.isLoggable(Level.FINE))
-                            {
-                                TRACE_LOGGER.log(Level.FINE, "IN: 
CH["+channel+"] : " + frameBody + (payload == null ? "" : "[" + payload + "]"));
-                            }
-                            TransportFrame frame = new TransportFrame(channel, 
frameBody, payload);
-
-                            if(_frameHandler.isHandlingFrames())
-                            {
-                                _tail_closed = 
_frameHandler.handleFrame(frame);
-                            }
-                            else
-                            {
-                                transportAccepting = false;
-                                _heldFrame = frame;
-                            }
-                        }
-                        else
-                        {
-                            throw new TransportException("Frameparser 
encountered a "
-                                    + (val == null? "null" : val.getClass())
-                                    + " which is not a " + FrameBody.class);
-                        }
-
-                        reset();
-                        in = oldIn;
-                        oldIn = null;
-                        _frameBuffer = null;
-                        state = State.SIZE_0;
-                    }
-                    catch (DecodeException ex)
-                    {
-                        state = State.ERROR;
-                        frameParsingError = new TransportException(ex);
-                    }
-                    break;
-                case ERROR:
-                    // do nothing
-            }
-
-        }
-
-        if (_tail_closed)
-        {
-            if (in.hasRemaining()) {
-                state = State.ERROR;
-                frameParsingError = new TransportException("framing error");
-            } else if (state != State.SIZE_0) {
-                state = State.ERROR;
-                frameParsingError = new TransportException("connection 
aborted");
-            } else {
-                _frameHandler.closed(null);
-            }
-        }
-
-        _state = state;
-        _size = size;
-
-        if(_state == State.ERROR)
-        {
-            _tail_closed = true;
-            if(frameParsingError != null)
-            {
-                _parsingError = frameParsingError;
-                _frameHandler.closed(frameParsingError);
-            }
-            else
-            {
-                throw new TransportException("Unable to parse, probably 
because of a previous error");
-            }
-        }
-    }
-
-    @Override
-    public int capacity()
-    {
-        if (_tail_closed) {
-            return Transport.END_OF_STREAM;
-        } else {
-            if (_inputBuffer != null) {
-                return _inputBuffer.remaining();
-            } else {
-                return _inputBufferSize;
-            }
-        }
-    }
-
-    @Override
-    public int position() {
-        if (_tail_closed) {
-            return Transport.END_OF_STREAM;
-        }
-        return (_inputBuffer == null) ? 0 : _inputBuffer.position();
-    }
-
-    @Override
-    public ByteBuffer tail()
-    {
-        if (_tail_closed) {
-            throw new TransportException("tail closed");
-        }
-
-        if (_inputBuffer == null) {
-            _inputBuffer = newWriteableBuffer(_inputBufferSize);
-        }
-
-        return _inputBuffer;
-    }
-
-    @Override
-    public void process() throws TransportException
-    {
-        if (_inputBuffer != null)
-        {
-            _inputBuffer.flip();
-
-            try
-            {
-                input(_inputBuffer);
-            }
-            finally
-            {
-                if (_inputBuffer.hasRemaining()) {
-                    _inputBuffer.compact();
-                } else if (_inputBuffer.capacity() > 
TransportImpl.BUFFER_RELEASE_THRESHOLD) {
-                    _inputBuffer = null;
-                } else {
-                    _inputBuffer.clear();
-                }
-            }
-        }
-        else
-        {
-            input(_emptyInputBuffer);
-        }
-    }
-
-    @Override
-    public void close_tail()
-    {
-        _tail_closed = true;
-        process();
-    }
-
-    /**
-     * Attempt to flush any cached data to the frame transport.  This function
-     * is useful if the {@link FrameHandler} state has changed.
-     */
-    public void flush()
-    {
-        flushHeldFrame();
-
-        if (_heldFrame == null)
-        {
-            process();
-        }
-    }
-
-    private void flushHeldFrame()
-    {
-        if(_heldFrame != null && _frameHandler.isHandlingFrames())
-        {
-            _tail_closed = _frameHandler.handleFrame(_heldFrame);
-            _heldFrame = null;
-        }
-    }
-
-    private void reset()
-    {
-        _size = 0;
-        _state = State.SIZE_0;
-    }
-
-    long getFramesInput()
-    {
-        return _framesInput;
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to