Added: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1687962&view=auto
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
 (added)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
 Sat Jun 27 21:13:25 2015
@@ -0,0 +1,695 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.security.PrivilegedAction;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.security.auth.Subject;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.amqp_1_0.codec.FrameWriter;
+import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
+import org.apache.qpid.amqp_1_0.framing.AMQFrame;
+import org.apache.qpid.amqp_1_0.framing.FrameHandler;
+import org.apache.qpid.amqp_1_0.framing.OversizeFrameException;
+import org.apache.qpid.amqp_1_0.framing.SASLFrameHandler;
+import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
+import org.apache.qpid.amqp_1_0.transport.Container;
+import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler;
+import org.apache.qpid.amqp_1_0.transport.SaslServerProvider;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.FrameBody;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.common.QpidProperties;
+import org.apache.qpid.common.ServerPropertyNames;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.protocol.ConnectionClosingTicker;
+import org.apache.qpid.server.transport.AbstractAMQPConnection;
+import org.apache.qpid.server.transport.ProtocolEngine;
+import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.security.SubjectCreator;
+import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
+import org.apache.qpid.server.security.auth.UsernamePrincipal;
+import 
org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
+import 
org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
+import org.apache.qpid.server.transport.NetworkConnectionScheduler;
+import org.apache.qpid.server.transport.NonBlockingConnection;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.network.AggregateTicker;
+import org.apache.qpid.transport.network.NetworkConnection;
+
+public class AMQPConnection_1_0 extends 
AbstractAMQPConnection<AMQPConnection_1_0>
+        implements FrameOutputHandler
+{
+
+    public static Logger LOGGER = 
LoggerFactory.getLogger(AMQPConnection_1_0.class);
+
+    public static final long CLOSE_REPONSE_TIMEOUT = 10000l;
+
+    private volatile long _lastReadTime;
+    private volatile long _lastWriteTime;
+    private long _createTime = System.currentTimeMillis();
+    private ConnectionEndpoint _endpoint;
+    private final AtomicBoolean _stateChanged = new AtomicBoolean();
+    private final AtomicReference<Action<ProtocolEngine>> _workListener = new 
AtomicReference<>();
+
+
+    private static final ByteBuffer SASL_LAYER_HEADER =
+           ByteBuffer.wrap(new byte[]
+                   {
+                       (byte)'A',
+                       (byte)'M',
+                       (byte)'Q',
+                       (byte)'P',
+                       (byte) 3,
+                       (byte) 1,
+                       (byte) 0,
+                       (byte) 0
+                   });
+
+    private static final ByteBuffer AMQP_LAYER_HEADER =
+        ByteBuffer.wrap(new byte[]
+                {
+                    (byte)'A',
+                    (byte)'M',
+                    (byte)'Q',
+                    (byte)'P',
+                    (byte) 0,
+                    (byte) 1,
+                    (byte) 0,
+                    (byte) 0
+                });
+
+
+    private FrameWriter _frameWriter;
+    private ProtocolHandler _frameHandler;
+    private Object _sendLock = new Object();
+    private byte _major;
+    private byte _minor;
+    private byte _revision;
+    private Connection_1_0 _connection;
+    private volatile boolean _transportBlockedForWriting;
+
+
+    static enum State {
+           A,
+           M,
+           Q,
+           P,
+           PROTOCOL,
+           MAJOR,
+           MINOR,
+           REVISION,
+           FRAME
+       }
+
+    private State _state = State.A;
+
+    private final AtomicReference<Thread> _messageAssignmentSuspended = new 
AtomicReference<>();
+
+
+
+
+    public AMQPConnection_1_0(final Broker<?> broker, final NetworkConnection 
network,
+                              AmqpPort<?> port, Transport transport, long id,
+                              final AggregateTicker aggregateTicker,
+                              final boolean useSASL)
+    {
+        super(broker, network, port, transport, id, aggregateTicker);
+        _connection = createConnection(broker, network, port, transport, id, 
useSASL);
+
+        _connection.setAmqpConnection(this);
+        _endpoint = _connection.getConnectionEndpoint();
+        _endpoint.setConnectionEventListener(_connection);
+        _endpoint.setFrameOutputHandler(this);
+        final List<String> mechanisms = 
port.getAuthenticationProvider().getSubjectCreator(transport.isSecure()).getMechanisms();
+        ByteBuffer headerResponse = useSASL ? initiateSasl() : 
initiateNonSasl(mechanisms);
+
+        _frameWriter =  new FrameWriter(_endpoint.getDescribedTypeRegistry());
+
+        getSender().send(headerResponse.duplicate());
+        getSender().flush();
+
+        if(useSASL)
+        {
+            _endpoint.initiateSASL(mechanisms.toArray(new 
String[mechanisms.size()]));
+        }
+
+
+    }
+
+    public static Connection_1_0 createConnection(final Broker<?> broker,
+                                                  final NetworkConnection 
network,
+                                                  final AmqpPort<?> port,
+                                                  final Transport transport,
+                                                  final long id,
+                                                  final boolean useSASL)
+    {
+        Container container = new Container(broker.getId().toString());
+
+        SubjectCreator subjectCreator = 
port.getAuthenticationProvider().getSubjectCreator(transport.isSecure());
+        final ConnectionEndpoint endpoint =
+                new ConnectionEndpoint(container, useSASL ? 
asSaslServerProvider(subjectCreator, network) : null);
+        endpoint.setLogger(new ConnectionEndpoint.FrameReceiptLogger()
+        {
+            @Override
+            public boolean isEnabled()
+            {
+                return FRAME_LOGGER.isDebugEnabled();
+            }
+
+            @Override
+            public void received(final SocketAddress remoteAddress, final 
short channel, final Object frame)
+            {
+                FRAME_LOGGER.debug("RECV[" + remoteAddress + "|" + channel + 
"] : " + frame);
+            }
+        });
+        Map<Symbol,Object> serverProperties = new LinkedHashMap<>();
+        serverProperties.put(Symbol.valueOf(ServerPropertyNames.PRODUCT), 
QpidProperties.getProductName());
+        serverProperties.put(Symbol.valueOf(ServerPropertyNames.VERSION), 
QpidProperties.getReleaseVersion());
+        serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_BUILD), 
QpidProperties.getBuildVersion());
+        
serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_INSTANCE_NAME), 
broker.getName());
+
+        endpoint.setProperties(serverProperties);
+
+        endpoint.setRemoteAddress(network.getRemoteAddress());
+        return new Connection_1_0(endpoint, id, port, transport, 
subjectCreator);
+    }
+
+
+    public ByteBufferSender getSender()
+    {
+        return getNetwork().getSender();
+    }
+
+    public ByteBuffer initiateNonSasl(final List<String> mechanisms)
+    {
+        final ByteBuffer headerResponse;
+        
if(mechanisms.contains(ExternalAuthenticationManagerImpl.MECHANISM_NAME)
+           && getNetwork().getPeerPrincipal() != null)
+        {
+            _connection.setUserPrincipal(new 
AuthenticatedPrincipal(getNetwork().getPeerPrincipal()));
+        }
+        else 
if(mechanisms.contains(AnonymousAuthenticationManager.MECHANISM_NAME))
+        {
+            _connection.setUserPrincipal(new 
AuthenticatedPrincipal(AnonymousAuthenticationManager.ANONYMOUS_PRINCIPAL));
+        }
+        else
+        {
+            getNetwork().close();
+        }
+
+        _frameHandler = new FrameHandler(_endpoint);
+        headerResponse = AMQP_LAYER_HEADER;
+        return headerResponse;
+    }
+
+    public ByteBuffer initiateSasl()
+    {
+        final ByteBuffer headerResponse;
+        _endpoint.setSaslFrameOutput(this);
+
+        _endpoint.setOnSaslComplete(new Runnable()
+        {
+            public void run()
+            {
+                if (_endpoint.isAuthenticated())
+                {
+                    getSender().send(AMQP_LAYER_HEADER.duplicate());
+                    getSender().flush();
+                }
+                else
+                {
+                    getNetwork().close();
+                }
+            }
+        });
+        _frameHandler = new SASLFrameHandler(_endpoint);
+        headerResponse = SASL_LAYER_HEADER;
+        return headerResponse;
+    }
+
+    @Override
+    public boolean isMessageAssignmentSuspended()
+    {
+        Thread lock = _messageAssignmentSuspended.get();
+        return lock != null && _messageAssignmentSuspended.get() != 
Thread.currentThread();
+    }
+
+    @Override
+    public void setMessageAssignmentSuspended(final boolean 
messageAssignmentSuspended)
+    {
+        _messageAssignmentSuspended.set(messageAssignmentSuspended ? 
Thread.currentThread() : null);
+
+        for(AMQSessionModel<?> session : _connection.getSessionModels())
+        {
+            for(Consumer<?> consumer : session.getConsumers())
+            {
+                ConsumerImpl consumerImpl = (ConsumerImpl) consumer;
+                if (!messageAssignmentSuspended)
+                {
+                    consumerImpl.getTarget().notifyCurrentState();
+                }
+                else
+                {
+                    // ensure that by the time the method returns, no consumer 
can be in the process of
+                    // delivering a message.
+                    consumerImpl.getSendLock();
+                    consumerImpl.releaseSendLock();
+                }
+            }
+        }
+    }
+
+
+    public void writerIdle()
+    {
+        //Todo
+    }
+
+    public void readerIdle()
+    {
+        //Todo
+    }
+
+    @Override
+    public void encryptedTransport()
+    {
+    }
+
+    private static SaslServerProvider asSaslServerProvider(final 
SubjectCreator subjectCreator,
+                                                           final 
NetworkConnection network)
+    {
+        return new SaslServerProvider()
+        {
+            @Override
+            public SaslServer getSaslServer(String mechanism, String fqdn) 
throws SaslException
+            {
+                return subjectCreator.createSaslServer(mechanism, fqdn, 
network.getPeerPrincipal());
+            }
+
+            @Override
+            public Principal getAuthenticatedPrincipal(SaslServer server)
+            {
+                return new AuthenticatedPrincipal(new 
UsernamePrincipal(server.getAuthorizationID()));
+            }
+        };
+    }
+
+    public String getAddress()
+    {
+        return getNetwork().getRemoteAddress().toString();
+    }
+
+    private final Logger RAW_LOGGER = LoggerFactory.getLogger("RAW");
+
+
+    public synchronized void received(final ByteBuffer msg)
+    {
+        try
+        {
+            _lastReadTime = System.currentTimeMillis();
+            if(RAW_LOGGER.isDebugEnabled())
+            {
+                ByteBuffer dup = msg.duplicate();
+                byte[] data = new byte[dup.remaining()];
+                dup.get(data);
+                Binary bin = new Binary(data);
+                RAW_LOGGER.debug("RECV[" + getNetwork().getRemoteAddress() + 
"] : " + bin.toString());
+            }
+            switch(_state)
+            {
+                case A:
+                    if (msg.hasRemaining())
+                    {
+                        msg.get();
+                    }
+                    else
+                    {
+                        break;
+                    }
+                case M:
+                    if (msg.hasRemaining())
+                    {
+                        msg.get();
+                    }
+                    else
+                    {
+                        _state = State.M;
+                        break;
+                    }
+
+                case Q:
+                    if (msg.hasRemaining())
+                    {
+                        msg.get();
+                    }
+                    else
+                    {
+                        _state = State.Q;
+                        break;
+                    }
+                case P:
+                    if (msg.hasRemaining())
+                    {
+                        msg.get();
+                    }
+                    else
+                    {
+                        _state = State.P;
+                        break;
+                    }
+                case PROTOCOL:
+                    if (msg.hasRemaining())
+                    {
+                        msg.get();
+                    }
+                    else
+                    {
+                        _state = State.PROTOCOL;
+                        break;
+                    }
+                case MAJOR:
+                    if (msg.hasRemaining())
+                    {
+                        _major = msg.get();
+                    }
+                    else
+                    {
+                        _state = State.MAJOR;
+                        break;
+                    }
+                case MINOR:
+                    if (msg.hasRemaining())
+                    {
+                        _minor = msg.get();
+                    }
+                    else
+                    {
+                        _state = State.MINOR;
+                        break;
+                    }
+                case REVISION:
+                    if (msg.hasRemaining())
+                    {
+                        _revision = msg.get();
+
+                        _state = State.FRAME;
+                    }
+                    else
+                    {
+                        _state = State.REVISION;
+                        break;
+                    }
+                case FRAME:
+                    if (msg.hasRemaining())
+                    {
+                        Subject.doAs(_connection.getSubject(), new 
PrivilegedAction<Void>()
+                        {
+                            @Override
+                            public Void run()
+                            {
+                                _frameHandler = _frameHandler.parse(msg);
+                                return null;
+                            }
+                        });
+
+                    }
+            }
+        }
+        catch(RuntimeException e)
+        {
+            LOGGER.error("Exception while processing incoming data", e);
+            getNetwork().close();
+        }
+     }
+
+
+    public void closed()
+    {
+        try
+        {
+            try
+            {
+                _endpoint.inputClosed();
+            }
+            finally
+            {
+                if (_endpoint != null && 
_endpoint.getConnectionEventListener() != null)
+                {
+                    ((Connection_1_0) 
_endpoint.getConnectionEventListener()).closed();
+                }
+            }
+        }
+        catch(RuntimeException e)
+        {
+
+            LOGGER.error("Exception while closing", e);
+            getNetwork().close();
+        }
+    }
+
+    void changeScheduler(final NetworkConnectionScheduler 
networkConnectionScheduler)
+    {
+        ((NonBlockingConnection) 
getNetwork()).changeScheduler(networkConnectionScheduler);
+    }
+
+
+    public long getCreateTime()
+    {
+        return _createTime;
+    }
+
+
+    public boolean canSend()
+    {
+        return true;
+    }
+
+    public void send(final AMQFrame amqFrame)
+    {
+        send(amqFrame, null);
+    }
+
+    private static final Logger FRAME_LOGGER = LoggerFactory.getLogger("FRM");
+
+
+    public void send(final AMQFrame amqFrame, ByteBuffer buf)
+    {
+
+        synchronized (_sendLock)
+        {
+            _lastWriteTime = System.currentTimeMillis();
+            if (FRAME_LOGGER.isDebugEnabled())
+            {
+                FRAME_LOGGER.debug("SEND["
+                                   + getNetwork().getRemoteAddress()
+                                   + "|"
+                                   + amqFrame.getChannel()
+                                   + "] : "
+                                   + amqFrame.getFrameBody());
+            }
+
+            _frameWriter.setValue(amqFrame);
+
+            ByteBuffer dup = ByteBuffer.allocate(_endpoint.getMaxFrameSize());
+
+            int size = _frameWriter.writeToBuffer(dup);
+            if (size > _endpoint.getMaxFrameSize())
+            {
+                throw new OversizeFrameException(amqFrame, size);
+            }
+
+            dup.flip();
+
+            if (RAW_LOGGER.isDebugEnabled())
+            {
+                ByteBuffer dup2 = dup.duplicate();
+                byte[] data = new byte[dup2.remaining()];
+                dup2.get(data);
+                Binary bin = new Binary(data);
+                RAW_LOGGER.debug("SEND[" + getNetwork().getRemoteAddress() + 
"] : " + bin.toString());
+            }
+
+            getSender().send(dup);
+            getSender().flush();
+
+
+        }
+    }
+
+    public void send(short channel, FrameBody body)
+    {
+        AMQFrame frame = AMQFrame.createAMQFrame(channel, body);
+        send(frame);
+
+    }
+
+    @Override
+    protected void performDeleteTasks()
+    {
+        super.performDeleteTasks();
+    }
+
+    public void close()
+    {
+        getAggregateTicker().addTicker(new 
ConnectionClosingTicker(System.currentTimeMillis()+ CLOSE_REPONSE_TIMEOUT,
+                                                                   
getNetwork()));
+
+    }
+
+    public long getLastReadTime()
+    {
+        return _lastReadTime;
+    }
+
+    public long getLastWriteTime()
+    {
+        return _lastWriteTime;
+    }
+
+    @Override
+    public boolean isTransportBlockedForWriting()
+    {
+        return _transportBlockedForWriting;
+    }
+    @Override
+    public void setTransportBlockedForWriting(final boolean blocked)
+    {
+        _transportBlockedForWriting = blocked;
+        _connection.transportStateChanged();
+
+    }
+
+    @Override
+    public void processPending()
+    {
+        _connection.processPending();
+
+    }
+
+    @Override
+    public boolean hasWork()
+    {
+        return _stateChanged.get();
+    }
+
+    @Override
+    public void notifyWork()
+    {
+        _stateChanged.set(true);
+
+        final Action<ProtocolEngine> listener = _workListener.get();
+        if(listener != null)
+        {
+            listener.performAction(this);
+        }
+    }
+
+    @Override
+    public void clearWork()
+    {
+        _stateChanged.set(false);
+    }
+
+    @Override
+    public void setWorkListener(final Action<ProtocolEngine> listener)
+    {
+        _workListener.set(listener);
+    }
+
+    public boolean hasSessionWithName(final byte[] name)
+    {
+        return false;
+    }
+
+    public void closeAsync(final AMQConstant cause, final String message)
+    {
+        _connection.closeAsync(cause, message);
+    }
+
+    public Principal getAuthorizedPrincipal()
+    {
+        return _connection.getAuthorizedPrincipal();
+    }
+
+    public void closeSessionAsync(final AMQSessionModel<?> session,
+                                  final AMQConstant cause, final String 
message)
+    {
+        _connection.closeSessionAsync((Session_1_0) session, cause, message);
+    }
+
+    public void block()
+    {
+        _connection.block();
+    }
+
+    public String getRemoteContainerName()
+    {
+        return _connection.getRemoteContainerName();
+    }
+
+    public VirtualHost<?, ?, ?> getVirtualHost()
+    {
+        return _connection.getVirtualHost();
+    }
+
+    public List<Session_1_0> getSessionModels()
+    {
+        return _connection.getSessionModels();
+    }
+
+    public void unblock()
+    {
+        _connection.unblock();
+    }
+
+    public LogSubject getLogSubject()
+    {
+        return _connection.getLogSubject();
+    }
+
+    public long getSessionCountLimit()
+    {
+        return _connection.getSessionCountLimit();
+    }
+
+}

Propchange: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
 Sat Jun 27 21:13:25 2015
@@ -22,7 +22,6 @@ package org.apache.qpid.server.protocol.
 
 import static 
org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
 
-import java.net.SocketAddress;
 import java.security.Principal;
 import java.security.PrivilegedAction;
 import java.text.MessageFormat;
@@ -31,10 +30,10 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
 
 import javax.security.auth.Subject;
 
@@ -43,46 +42,32 @@ import org.apache.qpid.amqp_1_0.transpor
 import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
 import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
 import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
+import org.apache.qpid.amqp_1_0.type.Symbol;
 import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
 import org.apache.qpid.amqp_1_0.type.transport.End;
 import org.apache.qpid.amqp_1_0.type.transport.Error;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.connection.ConnectionPrincipal;
 import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.model.adapter.ConnectionAdapter;
 import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.transport.ProtocolEngine;
-import org.apache.qpid.server.protocol.SessionModelListener;
 import org.apache.qpid.server.security.SubjectCreator;
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
-import org.apache.qpid.server.stats.StatisticsCounter;
-import org.apache.qpid.server.transport.NetworkConnectionScheduler;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
-public class Connection_1_0 implements ConnectionEventListener, 
AMQConnectionModel<Connection_1_0,Session_1_0>
+public class Connection_1_0 implements ConnectionEventListener
 {
 
     private final AmqpPort<?> _port;
-    private final Broker<?> _broker;
     private final SubjectCreator _subjectCreator;
-    private final ProtocolEngine_1_0_0 _protocolEngine;
-    private VirtualHostImpl _vhost;
+    private AMQPConnection_1_0 _amqpConnection;
+    private VirtualHostImpl<?,?,?> _vhost;
     private final Transport _transport;
-    private final ConnectionEndpoint _conn;
+    private final ConnectionEndpoint _connectionEndpoint;
     private final long _connectionId;
     private final Collection<Session_1_0> _sessions = 
Collections.synchronizedCollection(new ArrayList<Session_1_0>());
     private final Object _reference = new Object();
-    private final Subject _subject = new Subject();
-
-    private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners 
=
-            new CopyOnWriteArrayList<SessionModelListener>();
-
-    private final StatisticsCounter _messageDeliveryStatistics, 
_messageReceiptStatistics, _dataDeliveryStatistics, _dataReceiptStatistics;
 
     private final LogSubject _logSubject = new LogSubject()
     {
@@ -100,42 +85,34 @@ public class Connection_1_0 implements C
         }
     };
 
-    private volatile boolean _stopped;
-
-
-    private List<Action<? super Connection_1_0>> _closeTasks =
-            Collections.synchronizedList(new ArrayList<Action<? super 
Connection_1_0>>());
-
     private final Queue<Action<? super Connection_1_0>> _asyncTaskList =
             new ConcurrentLinkedQueue<>();
 
 
     private boolean _closedOnOpen;
-    private ConnectionAdapter _adapter;
 
 
-    public Connection_1_0(Broker<?> broker,
-                          ConnectionEndpoint conn,
+    public Connection_1_0(ConnectionEndpoint connectionEndpoint,
                           long connectionId,
                           AmqpPort<?> port,
                           Transport transport,
-                          final SubjectCreator subjectCreator,
-                          final ProtocolEngine_1_0_0 protocolEngine)
+                          final SubjectCreator subjectCreator)
     {
-        _protocolEngine = protocolEngine;
-        _broker = broker;
         _port = port;
         _transport = transport;
-        _conn = conn;
+        _connectionEndpoint = connectionEndpoint;
         _connectionId = connectionId;
-        _subject.getPrincipals().add(new ConnectionPrincipal(this));
         _subjectCreator = subjectCreator;
-        _messageDeliveryStatistics = new 
StatisticsCounter("messages-delivered-" + getConnectionId());
-        _dataDeliveryStatistics = new StatisticsCounter("data-delivered-" + 
getConnectionId());
-        _messageReceiptStatistics = new StatisticsCounter("messages-received-" 
+ getConnectionId());
-        _dataReceiptStatistics = new StatisticsCounter("data-received-" + 
getConnectionId());
-        _adapter = new ConnectionAdapter(this);
-        _adapter.create();
+    }
+
+    void setAmqpConnection(final AMQPConnection_1_0 amqpConnection)
+    {
+        _amqpConnection = amqpConnection;
+    }
+
+    public ConnectionEndpoint getConnectionEndpoint()
+    {
+        return _connectionEndpoint;
     }
 
     public Object getReference()
@@ -146,35 +123,48 @@ public class Connection_1_0 implements C
     @Override
     public void openReceived()
     {
-        String host = _conn.getLocalHostname();
+        String host = _connectionEndpoint.getLocalHostname();
+        Map clientProperties = _connectionEndpoint.getRemoteProperties();
+        if(clientProperties != null)
+        {
+            if(clientProperties.containsKey(Symbol.valueOf("product")))
+            {
+                
_amqpConnection.setClientProduct(clientProperties.get(Symbol.valueOf("product")).toString());
+            }
+            if(clientProperties.containsKey(Symbol.valueOf("version")))
+            {
+                
_amqpConnection.setClientVersion(clientProperties.get(Symbol.valueOf("version")).toString());
+            }
+            
_amqpConnection.setClientId(_connectionEndpoint.getRemoteContainerId());
+        }
         _vhost = ((AmqpPort)_port).getVirtualHost(host);
         if(_vhost == null)
         {
             final Error err = new Error();
             err.setCondition(AmqpError.NOT_FOUND);
             err.setDescription("Unknown hostname in connection open: '" + host 
+ "'");
-            _conn.close(err);
+            _connectionEndpoint.close(err);
             _closedOnOpen = true;
         }
         else
         {
-            final Principal user = _conn.getUser();
+            final Principal user = _connectionEndpoint.getUser();
             if(user != null)
             {
                 setUserPrincipal(user);
             }
-            _subject.getPrincipals().add(_vhost.getPrincipal());
-            
if(AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(_subject)
 == null)
+            
_amqpConnection.getSubject().getPrincipals().add(_vhost.getPrincipal());
+            
if(AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(_amqpConnection.getSubject())
 == null)
             {
                 final Error err = new Error();
                 err.setCondition(AmqpError.NOT_ALLOWED);
                 err.setDescription("Connection has not been authenticated");
-                _conn.close(err);
+                _connectionEndpoint.close(err);
                 _closedOnOpen = true;
             }
             else
             {
-                _adapter.virtualHostAssociated();
+                _amqpConnection.virtualHostAssociated();
             }
         }
     }
@@ -182,9 +172,9 @@ public class Connection_1_0 implements C
     void setUserPrincipal(final Principal user)
     {
         Subject authSubject = _subjectCreator.createSubjectWithGroups(user);
-        _subject.getPrincipals().addAll(authSubject.getPrincipals());
-        
_subject.getPublicCredentials().addAll(authSubject.getPublicCredentials());
-        
_subject.getPrivateCredentials().addAll(authSubject.getPrivateCredentials());
+        
_amqpConnection.getSubject().getPrincipals().addAll(authSubject.getPrincipals());
+        
_amqpConnection.getSubject().getPublicCredentials().addAll(authSubject.getPublicCredentials());
+        
_amqpConnection.getSubject().getPrivateCredentials().addAll(authSubject.getPrivateCredentials());
     }
 
     public void remoteSessionCreation(SessionEndpoint endpoint)
@@ -193,7 +183,7 @@ public class Connection_1_0 implements C
         {
             final Session_1_0 session = new Session_1_0(this, endpoint);
             _sessions.add(session);
-            sessionAdded(session);
+            _amqpConnection.sessionAdded(session);
             endpoint.setSessionEventListener(new SessionEventListener()
             {
                 @Override
@@ -231,22 +221,11 @@ public class Connection_1_0 implements C
     {
         if(!_closedOnOpen)
         {
-
             _sessions.remove(session);
-            sessionRemoved(session);
+            _amqpConnection.sessionRemoved(session);
         }
     }
 
-    public void removeDeleteTask(final Action<? super Connection_1_0> task)
-    {
-        _closeTasks.remove( task );
-    }
-
-    public void addDeleteTask(final Action<? super Connection_1_0> task)
-    {
-        _closeTasks.add( task );
-    }
-
     private void addAsyncTask(final Action<Connection_1_0> action)
     {
         _asyncTaskList.add(action);
@@ -256,7 +235,7 @@ public class Connection_1_0 implements C
 
     public void closeReceived()
     {
-        Collection<Session_1_0> sessions = new ArrayList(_sessions);
+        Collection<Session_1_0> sessions = new ArrayList<>(_sessions);
 
         for(Session_1_0 session : sessions)
         {
@@ -265,7 +244,7 @@ public class Connection_1_0 implements C
 
         if(_vhost != null)
         {
-            _vhost.deregisterConnection(_adapter);
+            _vhost.deregisterConnection(_amqpConnection);
         }
 
 
@@ -273,29 +252,16 @@ public class Connection_1_0 implements C
 
     void performCloseTasks()
     {
-        List<Action<? super Connection_1_0>> taskCopy;
-
-        synchronized (_closeTasks)
-        {
-            taskCopy = new ArrayList<Action<? super 
Connection_1_0>>(_closeTasks);
-        }
-        for(Action<? super Connection_1_0> task : taskCopy)
-        {
-            task.performAction(this);
-        }
-        synchronized (_closeTasks)
-        {
-            _closeTasks.clear();
-        }
+        _amqpConnection.performDeleteTasks();
     }
 
     public void closed()
     {
+        performCloseTasks();
         closeReceived();
     }
 
 
-    @Override
     public void closeAsync(AMQConstant cause, String message)
     {
         Action<Connection_1_0> action = new Action<Connection_1_0>()
@@ -303,27 +269,23 @@ public class Connection_1_0 implements C
             @Override
             public void performAction(final Connection_1_0 object)
             {
-                _conn.close();
+                _connectionEndpoint.close();
 
             }
         };
         addAsyncTask(action);
-
     }
 
-    @Override
     public void block()
     {
         // TODO
     }
 
-    @Override
     public void unblock()
     {
         // TODO
     }
 
-    @Override
     public void closeSessionAsync(final Session_1_0 session, final AMQConstant 
cause, final String message)
     {
         addAsyncTask(new Action<Connection_1_0>()
@@ -336,187 +298,65 @@ public class Connection_1_0 implements C
         });
     }
 
-    @Override
     public long getConnectionId()
     {
         return _connectionId;
     }
 
-    @Override
     public List<Session_1_0> getSessionModels()
     {
-        return new ArrayList<Session_1_0>(_sessions);
+        return new ArrayList<>(_sessions);
     }
 
-    @Override
     public LogSubject getLogSubject()
     {
         return _logSubject;
     }
 
-    @Override
     public String getRemoteAddressString()
     {
-        return String.valueOf(_conn.getRemoteAddress());
-    }
-
-    public SocketAddress getRemoteAddress()
-    {
-        return _conn.getRemoteAddress();
-    }
-
-    @Override
-    public String getRemoteProcessPid()
-    {
-        return null;  // TODO
+        return String.valueOf(_connectionEndpoint.getRemoteAddress());
     }
 
-    @Override
     public String getClientId()
     {
-        return _conn.getRemoteContainerId();
+        return _connectionEndpoint.getRemoteContainerId();
     }
 
-    @Override
     public String getRemoteContainerName()
     {
-        return _conn.getRemoteContainerId();
-    }
-
-    @Override
-    public String getClientVersion()
-    {
-        return "";  //TODO
-    }
-
-    @Override
-    public String getClientProduct()
-    {
-        return "";  //TODO
+        return _connectionEndpoint.getRemoteContainerId();
     }
 
     public Principal getAuthorizedPrincipal()
     {
-        Set<AuthenticatedPrincipal> authPrincipals = 
_subject.getPrincipals(AuthenticatedPrincipal.class);
+        Set<AuthenticatedPrincipal> authPrincipals = 
_amqpConnection.getSubject().getPrincipals(AuthenticatedPrincipal.class);
         return authPrincipals.isEmpty() ? null : 
authPrincipals.iterator().next();
     }
 
-    @Override
     public long getSessionCountLimit()
     {
         return 0;  // TODO
     }
 
-    @Override
-    public long getLastIoTime()
-    {
-        return 0;  // TODO
-    }
-
-    @Override
-    public String getVirtualHostName()
-    {
-        return _vhost == null ? null : _vhost.getName();
-    }
-
-    @Override
     public AmqpPort<?> getPort()
     {
         return _port;
     }
 
-    @Override
-    public ProtocolEngine getProtocolEngine()
-    {
-        return _protocolEngine;
-    }
-
-    @Override
-    public void setScheduler(final NetworkConnectionScheduler 
networkConnectionScheduler)
+    public AMQPConnection_1_0 getAmqpConnection()
     {
-        _protocolEngine.changeScheduler(networkConnectionScheduler);
+        return _amqpConnection;
     }
 
-    @Override
     public Transport getTransport()
     {
         return _transport;
     }
 
-    @Override
-    public void stop()
-    {
-        _stopped = true;
-    }
-
-    @Override
-    public boolean isStopped()
-    {
-        return _stopped;
-    }
-
-    @Override
-    public void registerMessageReceived(long messageSize, long timestamp)
-    {
-        _messageReceiptStatistics.registerEvent(1L, timestamp);
-        _dataReceiptStatistics.registerEvent(messageSize, timestamp);
-        _vhost.registerMessageReceived(messageSize,timestamp);
-
-    }
-
-    @Override
-    public void registerMessageDelivered(long messageSize)
-    {
-
-        _messageDeliveryStatistics.registerEvent(1L);
-        _dataDeliveryStatistics.registerEvent(messageSize);
-        _vhost.registerMessageDelivered(messageSize);
-    }
-
-    @Override
-    public StatisticsCounter getMessageDeliveryStatistics()
-    {
-        return _messageDeliveryStatistics;
-    }
-
-    @Override
-    public StatisticsCounter getMessageReceiptStatistics()
-    {
-        return _messageReceiptStatistics;
-    }
-
-    @Override
-    public StatisticsCounter getDataDeliveryStatistics()
-    {
-        return _dataDeliveryStatistics;
-    }
-
-    @Override
-    public StatisticsCounter getDataReceiptStatistics()
-    {
-        return _dataReceiptStatistics;
-    }
-
-    @Override
-    public void resetStatistics()
-    {
-        _dataDeliveryStatistics.reset();
-        _dataReceiptStatistics.reset();
-        _messageDeliveryStatistics.reset();
-        _messageReceiptStatistics.reset();
-    }
-
-
-
-    AMQConnectionModel getModel()
-    {
-        return this;
-    }
-
-
     Subject getSubject()
     {
-        return _subject;
+        return _amqpConnection.getSubject();
     }
 
     public VirtualHostImpl getVirtualHost()
@@ -525,35 +365,6 @@ public class Connection_1_0 implements C
     }
 
 
-    @Override
-    public void addSessionListener(final SessionModelListener listener)
-    {
-        _sessionListeners.add(listener);
-    }
-
-    @Override
-    public void removeSessionListener(final SessionModelListener listener)
-    {
-        _sessionListeners.remove(listener);
-    }
-
-    private void sessionAdded(final AMQSessionModel<?,?> session)
-    {
-        for(SessionModelListener l : _sessionListeners)
-        {
-            l.sessionAdded(session);
-        }
-    }
-
-    private void sessionRemoved(final AMQSessionModel<?,?> session)
-    {
-        for(SessionModelListener l : _sessionListeners)
-        {
-            l.sessionRemoved(session);
-        }
-    }
-
-
     public void transportStateChanged()
     {
         for (Session_1_0 session : _sessions)
@@ -562,25 +373,18 @@ public class Connection_1_0 implements C
         }
     }
 
-    @Override
     public void notifyWork()
     {
-        _protocolEngine.notifyWork();
-    }
-
-    @Override
-    public boolean isMessageAssignmentSuspended()
-    {
-        return _protocolEngine.isMessageAssignmentSuspended();
+        _amqpConnection.notifyWork();
     }
 
     public void processPending()
     {
-        List<? extends AMQSessionModel<?,?>> sessionsWithPending = new 
ArrayList<>(getSessionModels());
+        List<? extends AMQSessionModel<?>> sessionsWithPending = new 
ArrayList<>(getSessionModels());
         while(!sessionsWithPending.isEmpty())
         {
-            final Iterator<? extends AMQSessionModel<?, ?>> iter = 
sessionsWithPending.iterator();
-            AMQSessionModel<?, ?> session;
+            final Iterator<? extends AMQSessionModel<?>> iter = 
sessionsWithPending.iterator();
+            AMQSessionModel<?> session;
             while(iter.hasNext())
             {
                 session = iter.next();
@@ -605,7 +409,7 @@ public class Connection_1_0 implements C
         return "Connection_1_0["
                +  _connectionId
                + " "
-               + _protocolEngine.getRemoteAddress().toString()
+               + _amqpConnection.getAddress()
                + (_vhost == null ? "" : (" vh : " + _vhost.getName()))
                + ']';
     }

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
 Sat Jun 27 21:13:25 2015
@@ -92,7 +92,7 @@ class ConsumerTarget_1_0 extends Abstrac
     @Override
     public boolean doIsSuspended()
     {
-        return _link.getSession().getConnectionModel().isStopped() || 
getState() != State.ACTIVE;
+        return _link.getSession().getAMQPConnection().isConnectionStopped() || 
getState() != State.ACTIVE;
 
     }
 
@@ -264,7 +264,7 @@ class ConsumerTarget_1_0 extends Abstrac
                     }
 
                 }
-                
getSession().getConnectionModel().registerMessageDelivered(message.getSize());
+                
getSession().getAMQPConnection().registerMessageDelivered(message.getSize());
                 getEndpoint().transfer(transfer);
             }
             else
@@ -297,7 +297,7 @@ class ConsumerTarget_1_0 extends Abstrac
         synchronized (_link.getLock())
         {
 
-            ProtocolEngine protocolEngine = 
getSession().getConnection().getProtocolEngine();
+            ProtocolEngine protocolEngine = 
getSession().getConnection().getAmqpConnection();
             final boolean hasCredit = _link.isAttached() && 
getEndpoint().hasCreditToSend() && 
!protocolEngine.isTransportBlockedForWriting();
             if(!hasCredit && getState() == State.ACTIVE)
             {
@@ -335,7 +335,7 @@ class ConsumerTarget_1_0 extends Abstrac
     {
         synchronized(_link.getLock())
         {
-            ProtocolEngine protocolEngine = 
getSession().getConnection().getProtocolEngine();
+            ProtocolEngine protocolEngine = 
getSession().getConnection().getAmqpConnection();
             if(isSuspended() && getEndpoint() != null && 
!protocolEngine.isTransportBlockedForWriting())
             {
                 updateState(State.SUSPENDED, State.ACTIVE);

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java
 Sat Jun 27 21:13:25 2015
@@ -89,7 +89,7 @@ public class ProtocolEngineCreator_1_0_0
         
if(supportedMechanisms.contains(AnonymousAuthenticationManager.MECHANISM_NAME)
                 || 
(supportedMechanisms.contains(ExternalAuthenticationManagerImpl.MECHANISM_NAME) 
&& network.getPeerPrincipal() != null))
         {
-            return new ProtocolEngine_1_0_0(network, broker, id, port, 
transport, aggregateTicker, false);
+            return new AMQPConnection_1_0(broker, network, port, transport, 
id, aggregateTicker, false);
         }
         else
         {

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
 Sat Jun 27 21:13:25 2015
@@ -65,7 +65,7 @@ public class ProtocolEngineCreator_1_0_0
                                             Transport transport,
                                             long id, final AggregateTicker 
aggregateTicker)
     {
-        return new ProtocolEngine_1_0_0(network, broker, id, port, transport, 
aggregateTicker, true);
+        return new AMQPConnection_1_0(broker, network, port, transport, id, 
aggregateTicker, true);
     }
 
 

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
 Sat Jun 27 21:13:25 2015
@@ -215,7 +215,7 @@ public class ReceivingLink_1_0 implement
 
             getEndpoint().updateDisposition(deliveryTag, resultantState, 
settled);
 
-            
getSession().getConnectionModel().registerMessageReceived(message.getSize(), 
message.getArrivalTime());
+            
getSession().getAMQPConnection().registerMessageReceived(message.getSize(), 
message.getArrivalTime());
 
             if(!(transaction instanceof AutoCommitTransaction))
             {

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
 Sat Jun 27 21:13:25 2015
@@ -85,6 +85,7 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.protocol.ConsumerListener;
 import org.apache.qpid.server.protocol.LinkRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
@@ -93,7 +94,7 @@ import org.apache.qpid.server.virtualhos
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.transport.network.Ticker;
 
-public class Session_1_0 implements SessionEventListener, 
AMQSessionModel<Session_1_0, Connection_1_0>, LogSubject
+public class Session_1_0 implements SessionEventListener, 
AMQSessionModel<Session_1_0>, LogSubject
 {
     private static final Logger _logger = 
LoggerFactory.getLogger(Session_1_0.class);
     private static final Symbol LIFETIME_POLICY = 
Symbol.valueOf("lifetime-policy");
@@ -573,9 +574,9 @@ public class Session_1_0 implements Sess
     }
 
     @Override
-    public Connection_1_0 getConnectionModel()
+    public AMQPConnection<?> getAMQPConnection()
     {
-        return _connection;
+        return _connection.getAmqpConnection();
     }
 
     @Override
@@ -737,9 +738,9 @@ public class Session_1_0 implements Sess
 
     public String toLogString()
     {
-        long connectionId = getConnectionModel().getConnectionId();
+        long connectionId = getAMQPConnection().getConnectionId();
 
-        String remoteAddress = getConnectionModel().getRemoteAddressString();
+        String remoteAddress = getAMQPConnection().getRemoteAddressString();
 
         return "[" +
                MessageFormat.format(CHANNEL_FORMAT,
@@ -943,13 +944,13 @@ public class Session_1_0 implements Sess
     @Override
     public void addTicker(final Ticker ticker)
     {
-        
getConnection().getProtocolEngine().getAggregateTicker().addTicker(ticker);
+        
getConnection().getAmqpConnection().getAggregateTicker().addTicker(ticker);
     }
 
     @Override
     public void removeTicker(final Ticker ticker)
     {
-        
getConnection().getProtocolEngine().getAggregateTicker().removeTicker(ticker);
+        
getConnection().getAmqpConnection().getAggregateTicker().removeTicker(ticker);
     }
 
     private void consumerAdded(Consumer<?> consumer)

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
 Sat Jun 27 21:13:25 2015
@@ -41,6 +41,7 @@ import java.util.UUID;
 import javax.security.auth.Subject;
 
 import org.apache.qpid.server.model.*;
+import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.virtualhost.VirtualHostPrincipal;
 import org.mockito.ArgumentCaptor;
 import org.mockito.invocation.InvocationOnMock;
@@ -70,7 +71,7 @@ import org.apache.qpid.transport.network
 
 public class ProtocolEngine_1_0_0Test extends QpidTestCase
 {
-    private ProtocolEngine_1_0_0 _protocolEngine_1_0_0;
+    private AMQPConnection_1_0 _protocolEngine_1_0_0;
     private NetworkConnection _networkConnection;
     private Broker<?> _broker;
     private AmqpPort _port;
@@ -78,7 +79,7 @@ public class ProtocolEngine_1_0_0Test ex
     private AuthenticationProvider _authenticationProvider;
     private List<ByteBuffer> _sentBuffers;
     private FrameWriter _frameWriter;
-    private Connection _connection;
+    private AMQPConnection _connection;
     private VirtualHostImpl _virtualHost;
 
     @Override
@@ -103,7 +104,7 @@ public class ProtocolEngine_1_0_0Test ex
         _virtualHost = mock(VirtualHostImpl.class);
         when(_virtualHost.getChildExecutor()).thenReturn(taskExecutor);
         when(_virtualHost.getModel()).thenReturn(BrokerModel.getInstance());
-        final ArgumentCaptor<Connection> connectionCaptor = 
ArgumentCaptor.forClass(Connection.class);
+        final ArgumentCaptor<AMQPConnection> connectionCaptor = 
ArgumentCaptor.forClass(AMQPConnection.class);
         doAnswer(new Answer()
         {
             @Override
@@ -169,7 +170,7 @@ public class ProtocolEngine_1_0_0Test ex
         buf.flip();
         _protocolEngine_1_0_0.received(buf);
 
-        verify(_virtualHost).registerConnection(any(Connection.class));
+        verify(_virtualHost).registerConnection(any(AMQPConnection.class));
         AuthenticatedPrincipal principal = (AuthenticatedPrincipal) 
_connection.getUnderlyingConnection().getAuthorizedPrincipal();
         assertNotNull(principal);
         assertEquals(principal, new 
AuthenticatedPrincipal(AnonymousAuthenticationManager.ANONYMOUS_PRINCIPAL));
@@ -192,7 +193,7 @@ public class ProtocolEngine_1_0_0Test ex
         buf.flip();
         _protocolEngine_1_0_0.received(buf);
 
-        verify(_virtualHost, 
never()).registerConnection(any(Connection.class));
+        verify(_virtualHost, 
never()).registerConnection(any(AMQPConnection.class));
         verify(_networkConnection).close();
     }
 
@@ -222,7 +223,7 @@ public class ProtocolEngine_1_0_0Test ex
         buf.flip();
         _protocolEngine_1_0_0.received(buf);
 
-        verify(_virtualHost).registerConnection(any(Connection.class));
+        verify(_virtualHost).registerConnection(any(AMQPConnection.class));
         AuthenticatedPrincipal authPrincipal = (AuthenticatedPrincipal) 
_connection.getUnderlyingConnection().getAuthorizedPrincipal();
         assertNotNull(authPrincipal);
         assertEquals(authPrincipal, new AuthenticatedPrincipal(principal));
@@ -264,7 +265,7 @@ public class ProtocolEngine_1_0_0Test ex
         buf.flip();
         _protocolEngine_1_0_0.received(buf);
 
-        verify(_virtualHost).registerConnection(any(Connection.class));
+        verify(_virtualHost).registerConnection(any(AMQPConnection.class));
         AuthenticatedPrincipal principal = (AuthenticatedPrincipal) 
_connection.getUnderlyingConnection().getAuthorizedPrincipal();
         assertNotNull(principal);
         assertEquals(principal, new 
AuthenticatedPrincipal(AnonymousAuthenticationManager.ANONYMOUS_PRINCIPAL));
@@ -273,8 +274,8 @@ public class ProtocolEngine_1_0_0Test ex
 
     private void createEngine(final boolean useSASL, Transport transport)
     {
-        _protocolEngine_1_0_0 = new ProtocolEngine_1_0_0(_networkConnection,
-                                                         _broker, 1, _port, 
transport, new AggregateTicker(),
+        _protocolEngine_1_0_0 = new AMQPConnection_1_0(_broker, 
_networkConnection,
+                                                         _port, transport, 1, 
new AggregateTicker(),
                                                          useSASL);
     }
 

Modified: 
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
 Sat Jun 27 21:13:25 2015
@@ -993,7 +993,7 @@ class ManagementNode implements MessageS
     }
 
     @Override
-    public boolean verifySessionAccess(final AMQSessionModel<?, ?> session)
+    public boolean verifySessionAccess(final AMQSessionModel<?> session)
     {
         return true;
     }

Modified: 
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java
 Sat Jun 27 21:13:25 2015
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.lang.reflect.TypeVariable;
+import java.lang.reflect.WildcardType;
 import java.security.AccessControlException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -665,6 +666,14 @@ public class RestServlet extends Abstrac
                 return getRawType(bounds[0]);
             }
         }
+        else if(t instanceof WildcardType)
+        {
+            Type[] upperBounds = ((WildcardType)t).getUpperBounds();
+            if(upperBounds.length == 1)
+            {
+                return getRawType(upperBounds[0]);
+            }
+        }
         throw new ServerScopedRuntimeException("Unable to process type when 
constructing configuration model: " + t);
     }
 

Modified: 
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
 Sat Jun 27 21:13:25 2015
@@ -45,8 +45,7 @@ import org.eclipse.jetty.util.ssl.SslCon
 import org.eclipse.jetty.websocket.WebSocket;
 import org.eclipse.jetty.websocket.WebSocketHandler;
 
-import org.apache.qpid.server.transport.ProtocolEngine;
-import org.apache.qpid.server.transport.ProtocolEngineFactory;
+import org.apache.qpid.server.transport.MultiVersionProtocolEngine;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.Transport;
@@ -67,7 +66,7 @@ class WebSocketProvider implements Accep
     private final AmqpPort<?> _port;
     private final Set<Protocol> _supported;
     private final Protocol _defaultSupportedProtocolReply;
-    private final ProtocolEngineFactory _factory;
+    private final MultiVersionProtocolEngineFactory _factory;
     private Server _server;
 
     WebSocketProvider(final Transport transport,
@@ -202,7 +201,7 @@ class WebSocketProvider implements Accep
         private final Certificate _userCertificate;
         private Connection _connection;
         private final Transport _transport;
-        private ProtocolEngine _engine;
+        private MultiVersionProtocolEngine _engine;
 
         private AmqpWebSocket(final Transport transport,
                               final SocketAddress localAddress,
@@ -231,7 +230,7 @@ class WebSocketProvider implements Accep
             final ConnectionWrapper connectionWrapper =
                     new ConnectionWrapper(connection, _localAddress, 
_remoteAddress);
             connectionWrapper.setPeerCertificate(_userCertificate);
-            _engine.setNetworkConnection(connectionWrapper, 
connectionWrapper.getSender());
+            _engine.setNetworkConnection(connectionWrapper);
 
         }
 

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
 (original)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
 Sat Jun 27 21:13:25 2015
@@ -227,7 +227,7 @@ public class AMQConnectionDelegate_0_10
             _conn.setUsername(_qpidConnection.getUserID());
             _conn.setMaximumChannelCount(_qpidConnection.getChannelMax());
             _conn.getFailoverPolicy().attainedConnection();
-            _conn.logConnected(_qpidConnection.getLocalAddress(), 
_qpidConnection.getRemoteAddress());
+            _conn.logConnected(_qpidConnection.getLocalAddress(), 
_qpidConnection.getRemoteSocketAddress());
             _conn.setConnectionSettings(conSettings);
         }
         catch (ProtocolVersionException pe)

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
 (original)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
 Sat Jun 27 21:13:25 2015
@@ -65,7 +65,6 @@ import org.apache.qpid.protocol.AMQConst
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.ByteBufferReceiver;
 import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.ConnectionSettings;
 import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;

Modified: 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java 
(original)
+++ 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java 
Sat Jun 27 21:13:25 2015
@@ -736,7 +736,7 @@ public class Connection extends Connecti
         }
     }
 
-    public SocketAddress getRemoteAddress()
+    public SocketAddress getRemoteSocketAddress()
     {
         return _remoteAddress;
     }

Modified: 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
 (original)
+++ 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
 Sat Jun 27 21:13:25 2015
@@ -29,7 +29,6 @@ import static org.apache.qpid.transport.
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
-import org.apache.qpid.transport.ByteBufferReceiver;
 import org.apache.qpid.transport.Constant;
 import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
 import org.apache.qpid.transport.FrameSizeObserver;

Modified: 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
 (original)
+++ 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
 Sat Jun 27 21:13:25 2015
@@ -31,7 +31,6 @@ import javax.net.ssl.SSLSocket;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.transport.ByteBufferReceiver;
 import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
 import org.apache.qpid.transport.network.NetworkConnection;

Modified: 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
 (original)
+++ 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
 Sat Jun 27 21:13:25 2015
@@ -31,7 +31,6 @@ import java.util.concurrent.atomic.Atomi
 import javax.net.ssl.SSLSocket;
 
 import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.ByteBufferReceiver;
 import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.network.Ticker;

Modified: 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
 (original)
+++ 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
 Sat Jun 27 21:13:25 2015
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.transport.network.security;
 
-import org.apache.qpid.transport.ByteBufferReceiver;
 import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
 

Modified: 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
 (original)
+++ 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
 Sat Jun 27 21:13:25 2015
@@ -26,7 +26,6 @@ import javax.net.ssl.SSLEngine;
 import javax.net.ssl.TrustManager;
 
 import org.apache.qpid.ssl.SSLContextFactory;
-import org.apache.qpid.transport.ByteBufferReceiver;
 import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.ConnectionSettings;
 import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;

Modified: 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
 (original)
+++ 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
 Sat Jun 27 21:13:25 2015
@@ -25,7 +25,6 @@ import java.nio.ByteBuffer;
 
 import javax.security.sasl.SaslException;
 
-import org.apache.qpid.transport.ByteBufferReceiver;
 import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
 import org.apache.qpid.transport.SenderException;
 import org.apache.qpid.transport.util.Logger;

Modified: 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
 (original)
+++ 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
 Sat Jun 27 21:13:25 2015
@@ -28,7 +28,6 @@ import javax.net.ssl.SSLEngineResult.Han
 import javax.net.ssl.SSLEngineResult.Status;
 import javax.net.ssl.SSLException;
 
-import org.apache.qpid.transport.ByteBufferReceiver;
 import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.network.security.SSLStatus;

Modified: 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
 (original)
+++ 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
 Sat Jun 27 21:13:25 2015
@@ -20,9 +20,7 @@
 */
 package org.apache.qpid.server.protocol;
 
-import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.net.InetSocketAddress;
@@ -34,12 +32,8 @@ import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.Set;
 
-import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.model.*;
-import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.security.SubjectCreator;
 import org.apache.qpid.server.transport.MultiVersionProtocolEngineFactory;
-import org.apache.qpid.server.transport.ProtocolEngine;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.transport.ByteBufferSender;
@@ -126,79 +120,6 @@ public class MultiVersionProtocolEngineF
             };
 
 
-    private byte[] getAmqpHeader(final Protocol version)
-    {
-        switch(version)
-        {
-            case AMQP_0_8:
-                return AMQP_0_8_HEADER;
-            case AMQP_0_9:
-                return AMQP_0_9_HEADER;
-            case AMQP_0_9_1:
-                return AMQP_0_9_1_HEADER;
-            case AMQP_0_10:
-                return AMQP_0_10_HEADER;
-            case AMQP_1_0:
-                return AMQP_1_0_0_HEADER;
-            default:
-                fail("unknown AMQP version, appropriate header must be added 
for new protocol version");
-                return null;
-        }
-    }
-
-    /**
-     * Test to verify that connections established using a 
MultiVersionProtocolEngine are assigned
-     * IDs from a common sequence, independent of the protocol version under 
use.
-     */
-    public void testDifferentProtocolVersionsShareCommonIDNumberingSequence()
-    {
-        Set<Protocol> protocols = getAllAMQPProtocols();
-
-        SubjectCreator subjectCreator = mock(SubjectCreator.class);
-
-        AuthenticationProvider<?> authProvider = 
mock(AuthenticationProvider.class);
-        when(authProvider.getSubjectCreator(false)).thenReturn(subjectCreator);
-
-        AmqpPort port = mock(AmqpPort.class);
-        
when(port.canAcceptNewConnection(any(SocketAddress.class))).thenReturn(true);
-        when(port.getContextValue(eq(Integer.class), 
eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(AmqpPort.DEFAULT_MAX_MESSAGE_SIZE);
-        when(port.getAuthenticationProvider()).thenReturn(authProvider);
-
-        TaskExecutor childExecutor = _broker.getChildExecutor();
-        when(port.getChildExecutor()).thenReturn(childExecutor);
-        when(port.getCategoryClass()).thenReturn(Port.class);
-        when(port.getModel()).thenReturn(BrokerModel.getInstance());
-
-        when(port.getContextValue(eq(Long.class), 
eq(Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))).thenReturn(10000l);
-        MultiVersionProtocolEngineFactory factory =
-            new MultiVersionProtocolEngineFactory(_broker, protocols, null, 
port,
-                    org.apache.qpid.server.model.Transport.TCP);
-
-        //create a dummy to retrieve the 'current' ID number
-        long previousId = 
factory.newProtocolEngine(mock(SocketAddress.class)).getConnectionId();
-
-        //create a protocol engine and send the AMQP header for all supported 
AMQP verisons,
-        //ensuring the ID assigned increases as expected
-        for(Protocol protocol : protocols)
-        {
-            long expectedID = previousId + 1;
-            byte[] header = getAmqpHeader(protocol);
-            assertNotNull("protocol header should not be null", header);
-
-            ProtocolEngine engine = factory.newProtocolEngine(null);
-            TestNetworkConnection conn = new TestNetworkConnection();
-            engine.setNetworkConnection(conn, conn.getSender());
-            assertEquals("ID did not increment as expected", expectedID, 
engine.getConnectionId());
-
-            //actually feed in the AMQP header for this protocol version, and 
ensure the ID remains consistent
-            engine.received(ByteBuffer.wrap(header));
-            assertEquals("ID was not as expected following receipt of the AMQP 
version header", expectedID, engine.getConnectionId());
-
-            previousId = expectedID;
-            engine.closed();
-        }
-    }
-
     protected Set<Protocol> getAllAMQPProtocols()
     {
         Set<Protocol> protocols = EnumSet.allOf(Protocol.class);

Modified: 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
 (original)
+++ 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
 Sat Jun 27 21:13:25 2015
@@ -59,7 +59,6 @@ import org.apache.qpid.server.model.Stat
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.VirtualHostNode;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.v0_8.AMQMessage;
 import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
@@ -69,6 +68,7 @@ import org.apache.qpid.server.queue.Last
 import org.apache.qpid.server.queue.PriorityQueue;
 import org.apache.qpid.server.queue.PriorityQueueImpl;
 import org.apache.qpid.server.queue.StandardQueueImpl;
+import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.BrokerTestHelper;
@@ -676,8 +676,8 @@ public class VirtualHostMessageStoreTest
         queueArguments.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT);
         queueArguments.put(Queue.EXCLUSIVE, exclusive ? 
ExclusivityPolicy.CONTAINER : ExclusivityPolicy.NONE);
         AMQSessionModel sessionModel = mock(AMQSessionModel.class);
-        AMQConnectionModel connectionModel = mock(AMQConnectionModel.class);
-        when(sessionModel.getConnectionModel()).thenReturn(connectionModel);
+        AMQPConnection connectionModel = mock(AMQPConnection.class);
+        when(sessionModel.getAMQPConnection()).thenReturn(connectionModel);
         when(connectionModel.getRemoteContainerName()).thenReturn(queueOwner);
         SessionPrincipal principal = new SessionPrincipal(sessionModel);
         AMQQueue<?> queue = Subject.doAs(new Subject(true,

Modified: 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostLoggerRestTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostLoggerRestTest.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostLoggerRestTest.java
 (original)
+++ 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostLoggerRestTest.java
 Sat Jun 27 21:13:25 2015
@@ -29,7 +29,6 @@ import java.util.Map;
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.qpid.server.logging.VirtualHostFileLogger;
-import org.apache.qpid.server.logging.messages.QueueMessages;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.VirtualHostLogger;



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to