Added:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1687962&view=auto
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
(added)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
Sat Jun 27 21:13:25 2015
@@ -0,0 +1,695 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.security.PrivilegedAction;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.security.auth.Subject;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.amqp_1_0.codec.FrameWriter;
+import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
+import org.apache.qpid.amqp_1_0.framing.AMQFrame;
+import org.apache.qpid.amqp_1_0.framing.FrameHandler;
+import org.apache.qpid.amqp_1_0.framing.OversizeFrameException;
+import org.apache.qpid.amqp_1_0.framing.SASLFrameHandler;
+import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
+import org.apache.qpid.amqp_1_0.transport.Container;
+import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler;
+import org.apache.qpid.amqp_1_0.transport.SaslServerProvider;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.FrameBody;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.common.QpidProperties;
+import org.apache.qpid.common.ServerPropertyNames;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.protocol.ConnectionClosingTicker;
+import org.apache.qpid.server.transport.AbstractAMQPConnection;
+import org.apache.qpid.server.transport.ProtocolEngine;
+import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.security.SubjectCreator;
+import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
+import org.apache.qpid.server.security.auth.UsernamePrincipal;
+import
org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
+import
org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
+import org.apache.qpid.server.transport.NetworkConnectionScheduler;
+import org.apache.qpid.server.transport.NonBlockingConnection;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.network.AggregateTicker;
+import org.apache.qpid.transport.network.NetworkConnection;
+
+public class AMQPConnection_1_0 extends
AbstractAMQPConnection<AMQPConnection_1_0>
+ implements FrameOutputHandler
+{
+
+ public static Logger LOGGER =
LoggerFactory.getLogger(AMQPConnection_1_0.class);
+
+ public static final long CLOSE_REPONSE_TIMEOUT = 10000l;
+
+ private volatile long _lastReadTime;
+ private volatile long _lastWriteTime;
+ private long _createTime = System.currentTimeMillis();
+ private ConnectionEndpoint _endpoint;
+ private final AtomicBoolean _stateChanged = new AtomicBoolean();
+ private final AtomicReference<Action<ProtocolEngine>> _workListener = new
AtomicReference<>();
+
+
+ private static final ByteBuffer SASL_LAYER_HEADER =
+ ByteBuffer.wrap(new byte[]
+ {
+ (byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte) 3,
+ (byte) 1,
+ (byte) 0,
+ (byte) 0
+ });
+
+ private static final ByteBuffer AMQP_LAYER_HEADER =
+ ByteBuffer.wrap(new byte[]
+ {
+ (byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte) 0,
+ (byte) 1,
+ (byte) 0,
+ (byte) 0
+ });
+
+
+ private FrameWriter _frameWriter;
+ private ProtocolHandler _frameHandler;
+ private Object _sendLock = new Object();
+ private byte _major;
+ private byte _minor;
+ private byte _revision;
+ private Connection_1_0 _connection;
+ private volatile boolean _transportBlockedForWriting;
+
+
+ static enum State {
+ A,
+ M,
+ Q,
+ P,
+ PROTOCOL,
+ MAJOR,
+ MINOR,
+ REVISION,
+ FRAME
+ }
+
+ private State _state = State.A;
+
+ private final AtomicReference<Thread> _messageAssignmentSuspended = new
AtomicReference<>();
+
+
+
+
+ public AMQPConnection_1_0(final Broker<?> broker, final NetworkConnection
network,
+ AmqpPort<?> port, Transport transport, long id,
+ final AggregateTicker aggregateTicker,
+ final boolean useSASL)
+ {
+ super(broker, network, port, transport, id, aggregateTicker);
+ _connection = createConnection(broker, network, port, transport, id,
useSASL);
+
+ _connection.setAmqpConnection(this);
+ _endpoint = _connection.getConnectionEndpoint();
+ _endpoint.setConnectionEventListener(_connection);
+ _endpoint.setFrameOutputHandler(this);
+ final List<String> mechanisms =
port.getAuthenticationProvider().getSubjectCreator(transport.isSecure()).getMechanisms();
+ ByteBuffer headerResponse = useSASL ? initiateSasl() :
initiateNonSasl(mechanisms);
+
+ _frameWriter = new FrameWriter(_endpoint.getDescribedTypeRegistry());
+
+ getSender().send(headerResponse.duplicate());
+ getSender().flush();
+
+ if(useSASL)
+ {
+ _endpoint.initiateSASL(mechanisms.toArray(new
String[mechanisms.size()]));
+ }
+
+
+ }
+
+ public static Connection_1_0 createConnection(final Broker<?> broker,
+ final NetworkConnection
network,
+ final AmqpPort<?> port,
+ final Transport transport,
+ final long id,
+ final boolean useSASL)
+ {
+ Container container = new Container(broker.getId().toString());
+
+ SubjectCreator subjectCreator =
port.getAuthenticationProvider().getSubjectCreator(transport.isSecure());
+ final ConnectionEndpoint endpoint =
+ new ConnectionEndpoint(container, useSASL ?
asSaslServerProvider(subjectCreator, network) : null);
+ endpoint.setLogger(new ConnectionEndpoint.FrameReceiptLogger()
+ {
+ @Override
+ public boolean isEnabled()
+ {
+ return FRAME_LOGGER.isDebugEnabled();
+ }
+
+ @Override
+ public void received(final SocketAddress remoteAddress, final
short channel, final Object frame)
+ {
+ FRAME_LOGGER.debug("RECV[" + remoteAddress + "|" + channel +
"] : " + frame);
+ }
+ });
+ Map<Symbol,Object> serverProperties = new LinkedHashMap<>();
+ serverProperties.put(Symbol.valueOf(ServerPropertyNames.PRODUCT),
QpidProperties.getProductName());
+ serverProperties.put(Symbol.valueOf(ServerPropertyNames.VERSION),
QpidProperties.getReleaseVersion());
+ serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_BUILD),
QpidProperties.getBuildVersion());
+
serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_INSTANCE_NAME),
broker.getName());
+
+ endpoint.setProperties(serverProperties);
+
+ endpoint.setRemoteAddress(network.getRemoteAddress());
+ return new Connection_1_0(endpoint, id, port, transport,
subjectCreator);
+ }
+
+
+ public ByteBufferSender getSender()
+ {
+ return getNetwork().getSender();
+ }
+
+ public ByteBuffer initiateNonSasl(final List<String> mechanisms)
+ {
+ final ByteBuffer headerResponse;
+
if(mechanisms.contains(ExternalAuthenticationManagerImpl.MECHANISM_NAME)
+ && getNetwork().getPeerPrincipal() != null)
+ {
+ _connection.setUserPrincipal(new
AuthenticatedPrincipal(getNetwork().getPeerPrincipal()));
+ }
+ else
if(mechanisms.contains(AnonymousAuthenticationManager.MECHANISM_NAME))
+ {
+ _connection.setUserPrincipal(new
AuthenticatedPrincipal(AnonymousAuthenticationManager.ANONYMOUS_PRINCIPAL));
+ }
+ else
+ {
+ getNetwork().close();
+ }
+
+ _frameHandler = new FrameHandler(_endpoint);
+ headerResponse = AMQP_LAYER_HEADER;
+ return headerResponse;
+ }
+
+ public ByteBuffer initiateSasl()
+ {
+ final ByteBuffer headerResponse;
+ _endpoint.setSaslFrameOutput(this);
+
+ _endpoint.setOnSaslComplete(new Runnable()
+ {
+ public void run()
+ {
+ if (_endpoint.isAuthenticated())
+ {
+ getSender().send(AMQP_LAYER_HEADER.duplicate());
+ getSender().flush();
+ }
+ else
+ {
+ getNetwork().close();
+ }
+ }
+ });
+ _frameHandler = new SASLFrameHandler(_endpoint);
+ headerResponse = SASL_LAYER_HEADER;
+ return headerResponse;
+ }
+
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ Thread lock = _messageAssignmentSuspended.get();
+ return lock != null && _messageAssignmentSuspended.get() !=
Thread.currentThread();
+ }
+
+ @Override
+ public void setMessageAssignmentSuspended(final boolean
messageAssignmentSuspended)
+ {
+ _messageAssignmentSuspended.set(messageAssignmentSuspended ?
Thread.currentThread() : null);
+
+ for(AMQSessionModel<?> session : _connection.getSessionModels())
+ {
+ for(Consumer<?> consumer : session.getConsumers())
+ {
+ ConsumerImpl consumerImpl = (ConsumerImpl) consumer;
+ if (!messageAssignmentSuspended)
+ {
+ consumerImpl.getTarget().notifyCurrentState();
+ }
+ else
+ {
+ // ensure that by the time the method returns, no consumer
can be in the process of
+ // delivering a message.
+ consumerImpl.getSendLock();
+ consumerImpl.releaseSendLock();
+ }
+ }
+ }
+ }
+
+
+ public void writerIdle()
+ {
+ //Todo
+ }
+
+ public void readerIdle()
+ {
+ //Todo
+ }
+
+ @Override
+ public void encryptedTransport()
+ {
+ }
+
+ private static SaslServerProvider asSaslServerProvider(final
SubjectCreator subjectCreator,
+ final
NetworkConnection network)
+ {
+ return new SaslServerProvider()
+ {
+ @Override
+ public SaslServer getSaslServer(String mechanism, String fqdn)
throws SaslException
+ {
+ return subjectCreator.createSaslServer(mechanism, fqdn,
network.getPeerPrincipal());
+ }
+
+ @Override
+ public Principal getAuthenticatedPrincipal(SaslServer server)
+ {
+ return new AuthenticatedPrincipal(new
UsernamePrincipal(server.getAuthorizationID()));
+ }
+ };
+ }
+
+ public String getAddress()
+ {
+ return getNetwork().getRemoteAddress().toString();
+ }
+
+ private final Logger RAW_LOGGER = LoggerFactory.getLogger("RAW");
+
+
+ public synchronized void received(final ByteBuffer msg)
+ {
+ try
+ {
+ _lastReadTime = System.currentTimeMillis();
+ if(RAW_LOGGER.isDebugEnabled())
+ {
+ ByteBuffer dup = msg.duplicate();
+ byte[] data = new byte[dup.remaining()];
+ dup.get(data);
+ Binary bin = new Binary(data);
+ RAW_LOGGER.debug("RECV[" + getNetwork().getRemoteAddress() +
"] : " + bin.toString());
+ }
+ switch(_state)
+ {
+ case A:
+ if (msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ break;
+ }
+ case M:
+ if (msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.M;
+ break;
+ }
+
+ case Q:
+ if (msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.Q;
+ break;
+ }
+ case P:
+ if (msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.P;
+ break;
+ }
+ case PROTOCOL:
+ if (msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.PROTOCOL;
+ break;
+ }
+ case MAJOR:
+ if (msg.hasRemaining())
+ {
+ _major = msg.get();
+ }
+ else
+ {
+ _state = State.MAJOR;
+ break;
+ }
+ case MINOR:
+ if (msg.hasRemaining())
+ {
+ _minor = msg.get();
+ }
+ else
+ {
+ _state = State.MINOR;
+ break;
+ }
+ case REVISION:
+ if (msg.hasRemaining())
+ {
+ _revision = msg.get();
+
+ _state = State.FRAME;
+ }
+ else
+ {
+ _state = State.REVISION;
+ break;
+ }
+ case FRAME:
+ if (msg.hasRemaining())
+ {
+ Subject.doAs(_connection.getSubject(), new
PrivilegedAction<Void>()
+ {
+ @Override
+ public Void run()
+ {
+ _frameHandler = _frameHandler.parse(msg);
+ return null;
+ }
+ });
+
+ }
+ }
+ }
+ catch(RuntimeException e)
+ {
+ LOGGER.error("Exception while processing incoming data", e);
+ getNetwork().close();
+ }
+ }
+
+
+ public void closed()
+ {
+ try
+ {
+ try
+ {
+ _endpoint.inputClosed();
+ }
+ finally
+ {
+ if (_endpoint != null &&
_endpoint.getConnectionEventListener() != null)
+ {
+ ((Connection_1_0)
_endpoint.getConnectionEventListener()).closed();
+ }
+ }
+ }
+ catch(RuntimeException e)
+ {
+
+ LOGGER.error("Exception while closing", e);
+ getNetwork().close();
+ }
+ }
+
+ void changeScheduler(final NetworkConnectionScheduler
networkConnectionScheduler)
+ {
+ ((NonBlockingConnection)
getNetwork()).changeScheduler(networkConnectionScheduler);
+ }
+
+
+ public long getCreateTime()
+ {
+ return _createTime;
+ }
+
+
+ public boolean canSend()
+ {
+ return true;
+ }
+
+ public void send(final AMQFrame amqFrame)
+ {
+ send(amqFrame, null);
+ }
+
+ private static final Logger FRAME_LOGGER = LoggerFactory.getLogger("FRM");
+
+
+ public void send(final AMQFrame amqFrame, ByteBuffer buf)
+ {
+
+ synchronized (_sendLock)
+ {
+ _lastWriteTime = System.currentTimeMillis();
+ if (FRAME_LOGGER.isDebugEnabled())
+ {
+ FRAME_LOGGER.debug("SEND["
+ + getNetwork().getRemoteAddress()
+ + "|"
+ + amqFrame.getChannel()
+ + "] : "
+ + amqFrame.getFrameBody());
+ }
+
+ _frameWriter.setValue(amqFrame);
+
+ ByteBuffer dup = ByteBuffer.allocate(_endpoint.getMaxFrameSize());
+
+ int size = _frameWriter.writeToBuffer(dup);
+ if (size > _endpoint.getMaxFrameSize())
+ {
+ throw new OversizeFrameException(amqFrame, size);
+ }
+
+ dup.flip();
+
+ if (RAW_LOGGER.isDebugEnabled())
+ {
+ ByteBuffer dup2 = dup.duplicate();
+ byte[] data = new byte[dup2.remaining()];
+ dup2.get(data);
+ Binary bin = new Binary(data);
+ RAW_LOGGER.debug("SEND[" + getNetwork().getRemoteAddress() +
"] : " + bin.toString());
+ }
+
+ getSender().send(dup);
+ getSender().flush();
+
+
+ }
+ }
+
+ public void send(short channel, FrameBody body)
+ {
+ AMQFrame frame = AMQFrame.createAMQFrame(channel, body);
+ send(frame);
+
+ }
+
+ @Override
+ protected void performDeleteTasks()
+ {
+ super.performDeleteTasks();
+ }
+
+ public void close()
+ {
+ getAggregateTicker().addTicker(new
ConnectionClosingTicker(System.currentTimeMillis()+ CLOSE_REPONSE_TIMEOUT,
+
getNetwork()));
+
+ }
+
+ public long getLastReadTime()
+ {
+ return _lastReadTime;
+ }
+
+ public long getLastWriteTime()
+ {
+ return _lastWriteTime;
+ }
+
+ @Override
+ public boolean isTransportBlockedForWriting()
+ {
+ return _transportBlockedForWriting;
+ }
+ @Override
+ public void setTransportBlockedForWriting(final boolean blocked)
+ {
+ _transportBlockedForWriting = blocked;
+ _connection.transportStateChanged();
+
+ }
+
+ @Override
+ public void processPending()
+ {
+ _connection.processPending();
+
+ }
+
+ @Override
+ public boolean hasWork()
+ {
+ return _stateChanged.get();
+ }
+
+ @Override
+ public void notifyWork()
+ {
+ _stateChanged.set(true);
+
+ final Action<ProtocolEngine> listener = _workListener.get();
+ if(listener != null)
+ {
+ listener.performAction(this);
+ }
+ }
+
+ @Override
+ public void clearWork()
+ {
+ _stateChanged.set(false);
+ }
+
+ @Override
+ public void setWorkListener(final Action<ProtocolEngine> listener)
+ {
+ _workListener.set(listener);
+ }
+
+ public boolean hasSessionWithName(final byte[] name)
+ {
+ return false;
+ }
+
+ public void closeAsync(final AMQConstant cause, final String message)
+ {
+ _connection.closeAsync(cause, message);
+ }
+
+ public Principal getAuthorizedPrincipal()
+ {
+ return _connection.getAuthorizedPrincipal();
+ }
+
+ public void closeSessionAsync(final AMQSessionModel<?> session,
+ final AMQConstant cause, final String
message)
+ {
+ _connection.closeSessionAsync((Session_1_0) session, cause, message);
+ }
+
+ public void block()
+ {
+ _connection.block();
+ }
+
+ public String getRemoteContainerName()
+ {
+ return _connection.getRemoteContainerName();
+ }
+
+ public VirtualHost<?, ?, ?> getVirtualHost()
+ {
+ return _connection.getVirtualHost();
+ }
+
+ public List<Session_1_0> getSessionModels()
+ {
+ return _connection.getSessionModels();
+ }
+
+ public void unblock()
+ {
+ _connection.unblock();
+ }
+
+ public LogSubject getLogSubject()
+ {
+ return _connection.getLogSubject();
+ }
+
+ public long getSessionCountLimit()
+ {
+ return _connection.getSessionCountLimit();
+ }
+
+}
Propchange:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
Sat Jun 27 21:13:25 2015
@@ -22,7 +22,6 @@ package org.apache.qpid.server.protocol.
import static
org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
-import java.net.SocketAddress;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.text.MessageFormat;
@@ -31,10 +30,10 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
import javax.security.auth.Subject;
@@ -43,46 +42,32 @@ import org.apache.qpid.amqp_1_0.transpor
import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
+import org.apache.qpid.amqp_1_0.type.Symbol;
import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
import org.apache.qpid.amqp_1_0.type.transport.End;
import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.model.adapter.ConnectionAdapter;
import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.transport.ProtocolEngine;
-import org.apache.qpid.server.protocol.SessionModelListener;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
-import org.apache.qpid.server.stats.StatisticsCounter;
-import org.apache.qpid.server.transport.NetworkConnectionScheduler;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-public class Connection_1_0 implements ConnectionEventListener,
AMQConnectionModel<Connection_1_0,Session_1_0>
+public class Connection_1_0 implements ConnectionEventListener
{
private final AmqpPort<?> _port;
- private final Broker<?> _broker;
private final SubjectCreator _subjectCreator;
- private final ProtocolEngine_1_0_0 _protocolEngine;
- private VirtualHostImpl _vhost;
+ private AMQPConnection_1_0 _amqpConnection;
+ private VirtualHostImpl<?,?,?> _vhost;
private final Transport _transport;
- private final ConnectionEndpoint _conn;
+ private final ConnectionEndpoint _connectionEndpoint;
private final long _connectionId;
private final Collection<Session_1_0> _sessions =
Collections.synchronizedCollection(new ArrayList<Session_1_0>());
private final Object _reference = new Object();
- private final Subject _subject = new Subject();
-
- private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners
=
- new CopyOnWriteArrayList<SessionModelListener>();
-
- private final StatisticsCounter _messageDeliveryStatistics,
_messageReceiptStatistics, _dataDeliveryStatistics, _dataReceiptStatistics;
private final LogSubject _logSubject = new LogSubject()
{
@@ -100,42 +85,34 @@ public class Connection_1_0 implements C
}
};
- private volatile boolean _stopped;
-
-
- private List<Action<? super Connection_1_0>> _closeTasks =
- Collections.synchronizedList(new ArrayList<Action<? super
Connection_1_0>>());
-
private final Queue<Action<? super Connection_1_0>> _asyncTaskList =
new ConcurrentLinkedQueue<>();
private boolean _closedOnOpen;
- private ConnectionAdapter _adapter;
- public Connection_1_0(Broker<?> broker,
- ConnectionEndpoint conn,
+ public Connection_1_0(ConnectionEndpoint connectionEndpoint,
long connectionId,
AmqpPort<?> port,
Transport transport,
- final SubjectCreator subjectCreator,
- final ProtocolEngine_1_0_0 protocolEngine)
+ final SubjectCreator subjectCreator)
{
- _protocolEngine = protocolEngine;
- _broker = broker;
_port = port;
_transport = transport;
- _conn = conn;
+ _connectionEndpoint = connectionEndpoint;
_connectionId = connectionId;
- _subject.getPrincipals().add(new ConnectionPrincipal(this));
_subjectCreator = subjectCreator;
- _messageDeliveryStatistics = new
StatisticsCounter("messages-delivered-" + getConnectionId());
- _dataDeliveryStatistics = new StatisticsCounter("data-delivered-" +
getConnectionId());
- _messageReceiptStatistics = new StatisticsCounter("messages-received-"
+ getConnectionId());
- _dataReceiptStatistics = new StatisticsCounter("data-received-" +
getConnectionId());
- _adapter = new ConnectionAdapter(this);
- _adapter.create();
+ }
+
+ void setAmqpConnection(final AMQPConnection_1_0 amqpConnection)
+ {
+ _amqpConnection = amqpConnection;
+ }
+
+ public ConnectionEndpoint getConnectionEndpoint()
+ {
+ return _connectionEndpoint;
}
public Object getReference()
@@ -146,35 +123,48 @@ public class Connection_1_0 implements C
@Override
public void openReceived()
{
- String host = _conn.getLocalHostname();
+ String host = _connectionEndpoint.getLocalHostname();
+ Map clientProperties = _connectionEndpoint.getRemoteProperties();
+ if(clientProperties != null)
+ {
+ if(clientProperties.containsKey(Symbol.valueOf("product")))
+ {
+
_amqpConnection.setClientProduct(clientProperties.get(Symbol.valueOf("product")).toString());
+ }
+ if(clientProperties.containsKey(Symbol.valueOf("version")))
+ {
+
_amqpConnection.setClientVersion(clientProperties.get(Symbol.valueOf("version")).toString());
+ }
+
_amqpConnection.setClientId(_connectionEndpoint.getRemoteContainerId());
+ }
_vhost = ((AmqpPort)_port).getVirtualHost(host);
if(_vhost == null)
{
final Error err = new Error();
err.setCondition(AmqpError.NOT_FOUND);
err.setDescription("Unknown hostname in connection open: '" + host
+ "'");
- _conn.close(err);
+ _connectionEndpoint.close(err);
_closedOnOpen = true;
}
else
{
- final Principal user = _conn.getUser();
+ final Principal user = _connectionEndpoint.getUser();
if(user != null)
{
setUserPrincipal(user);
}
- _subject.getPrincipals().add(_vhost.getPrincipal());
-
if(AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(_subject)
== null)
+
_amqpConnection.getSubject().getPrincipals().add(_vhost.getPrincipal());
+
if(AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(_amqpConnection.getSubject())
== null)
{
final Error err = new Error();
err.setCondition(AmqpError.NOT_ALLOWED);
err.setDescription("Connection has not been authenticated");
- _conn.close(err);
+ _connectionEndpoint.close(err);
_closedOnOpen = true;
}
else
{
- _adapter.virtualHostAssociated();
+ _amqpConnection.virtualHostAssociated();
}
}
}
@@ -182,9 +172,9 @@ public class Connection_1_0 implements C
void setUserPrincipal(final Principal user)
{
Subject authSubject = _subjectCreator.createSubjectWithGroups(user);
- _subject.getPrincipals().addAll(authSubject.getPrincipals());
-
_subject.getPublicCredentials().addAll(authSubject.getPublicCredentials());
-
_subject.getPrivateCredentials().addAll(authSubject.getPrivateCredentials());
+
_amqpConnection.getSubject().getPrincipals().addAll(authSubject.getPrincipals());
+
_amqpConnection.getSubject().getPublicCredentials().addAll(authSubject.getPublicCredentials());
+
_amqpConnection.getSubject().getPrivateCredentials().addAll(authSubject.getPrivateCredentials());
}
public void remoteSessionCreation(SessionEndpoint endpoint)
@@ -193,7 +183,7 @@ public class Connection_1_0 implements C
{
final Session_1_0 session = new Session_1_0(this, endpoint);
_sessions.add(session);
- sessionAdded(session);
+ _amqpConnection.sessionAdded(session);
endpoint.setSessionEventListener(new SessionEventListener()
{
@Override
@@ -231,22 +221,11 @@ public class Connection_1_0 implements C
{
if(!_closedOnOpen)
{
-
_sessions.remove(session);
- sessionRemoved(session);
+ _amqpConnection.sessionRemoved(session);
}
}
- public void removeDeleteTask(final Action<? super Connection_1_0> task)
- {
- _closeTasks.remove( task );
- }
-
- public void addDeleteTask(final Action<? super Connection_1_0> task)
- {
- _closeTasks.add( task );
- }
-
private void addAsyncTask(final Action<Connection_1_0> action)
{
_asyncTaskList.add(action);
@@ -256,7 +235,7 @@ public class Connection_1_0 implements C
public void closeReceived()
{
- Collection<Session_1_0> sessions = new ArrayList(_sessions);
+ Collection<Session_1_0> sessions = new ArrayList<>(_sessions);
for(Session_1_0 session : sessions)
{
@@ -265,7 +244,7 @@ public class Connection_1_0 implements C
if(_vhost != null)
{
- _vhost.deregisterConnection(_adapter);
+ _vhost.deregisterConnection(_amqpConnection);
}
@@ -273,29 +252,16 @@ public class Connection_1_0 implements C
void performCloseTasks()
{
- List<Action<? super Connection_1_0>> taskCopy;
-
- synchronized (_closeTasks)
- {
- taskCopy = new ArrayList<Action<? super
Connection_1_0>>(_closeTasks);
- }
- for(Action<? super Connection_1_0> task : taskCopy)
- {
- task.performAction(this);
- }
- synchronized (_closeTasks)
- {
- _closeTasks.clear();
- }
+ _amqpConnection.performDeleteTasks();
}
public void closed()
{
+ performCloseTasks();
closeReceived();
}
- @Override
public void closeAsync(AMQConstant cause, String message)
{
Action<Connection_1_0> action = new Action<Connection_1_0>()
@@ -303,27 +269,23 @@ public class Connection_1_0 implements C
@Override
public void performAction(final Connection_1_0 object)
{
- _conn.close();
+ _connectionEndpoint.close();
}
};
addAsyncTask(action);
-
}
- @Override
public void block()
{
// TODO
}
- @Override
public void unblock()
{
// TODO
}
- @Override
public void closeSessionAsync(final Session_1_0 session, final AMQConstant
cause, final String message)
{
addAsyncTask(new Action<Connection_1_0>()
@@ -336,187 +298,65 @@ public class Connection_1_0 implements C
});
}
- @Override
public long getConnectionId()
{
return _connectionId;
}
- @Override
public List<Session_1_0> getSessionModels()
{
- return new ArrayList<Session_1_0>(_sessions);
+ return new ArrayList<>(_sessions);
}
- @Override
public LogSubject getLogSubject()
{
return _logSubject;
}
- @Override
public String getRemoteAddressString()
{
- return String.valueOf(_conn.getRemoteAddress());
- }
-
- public SocketAddress getRemoteAddress()
- {
- return _conn.getRemoteAddress();
- }
-
- @Override
- public String getRemoteProcessPid()
- {
- return null; // TODO
+ return String.valueOf(_connectionEndpoint.getRemoteAddress());
}
- @Override
public String getClientId()
{
- return _conn.getRemoteContainerId();
+ return _connectionEndpoint.getRemoteContainerId();
}
- @Override
public String getRemoteContainerName()
{
- return _conn.getRemoteContainerId();
- }
-
- @Override
- public String getClientVersion()
- {
- return ""; //TODO
- }
-
- @Override
- public String getClientProduct()
- {
- return ""; //TODO
+ return _connectionEndpoint.getRemoteContainerId();
}
public Principal getAuthorizedPrincipal()
{
- Set<AuthenticatedPrincipal> authPrincipals =
_subject.getPrincipals(AuthenticatedPrincipal.class);
+ Set<AuthenticatedPrincipal> authPrincipals =
_amqpConnection.getSubject().getPrincipals(AuthenticatedPrincipal.class);
return authPrincipals.isEmpty() ? null :
authPrincipals.iterator().next();
}
- @Override
public long getSessionCountLimit()
{
return 0; // TODO
}
- @Override
- public long getLastIoTime()
- {
- return 0; // TODO
- }
-
- @Override
- public String getVirtualHostName()
- {
- return _vhost == null ? null : _vhost.getName();
- }
-
- @Override
public AmqpPort<?> getPort()
{
return _port;
}
- @Override
- public ProtocolEngine getProtocolEngine()
- {
- return _protocolEngine;
- }
-
- @Override
- public void setScheduler(final NetworkConnectionScheduler
networkConnectionScheduler)
+ public AMQPConnection_1_0 getAmqpConnection()
{
- _protocolEngine.changeScheduler(networkConnectionScheduler);
+ return _amqpConnection;
}
- @Override
public Transport getTransport()
{
return _transport;
}
- @Override
- public void stop()
- {
- _stopped = true;
- }
-
- @Override
- public boolean isStopped()
- {
- return _stopped;
- }
-
- @Override
- public void registerMessageReceived(long messageSize, long timestamp)
- {
- _messageReceiptStatistics.registerEvent(1L, timestamp);
- _dataReceiptStatistics.registerEvent(messageSize, timestamp);
- _vhost.registerMessageReceived(messageSize,timestamp);
-
- }
-
- @Override
- public void registerMessageDelivered(long messageSize)
- {
-
- _messageDeliveryStatistics.registerEvent(1L);
- _dataDeliveryStatistics.registerEvent(messageSize);
- _vhost.registerMessageDelivered(messageSize);
- }
-
- @Override
- public StatisticsCounter getMessageDeliveryStatistics()
- {
- return _messageDeliveryStatistics;
- }
-
- @Override
- public StatisticsCounter getMessageReceiptStatistics()
- {
- return _messageReceiptStatistics;
- }
-
- @Override
- public StatisticsCounter getDataDeliveryStatistics()
- {
- return _dataDeliveryStatistics;
- }
-
- @Override
- public StatisticsCounter getDataReceiptStatistics()
- {
- return _dataReceiptStatistics;
- }
-
- @Override
- public void resetStatistics()
- {
- _dataDeliveryStatistics.reset();
- _dataReceiptStatistics.reset();
- _messageDeliveryStatistics.reset();
- _messageReceiptStatistics.reset();
- }
-
-
-
- AMQConnectionModel getModel()
- {
- return this;
- }
-
-
Subject getSubject()
{
- return _subject;
+ return _amqpConnection.getSubject();
}
public VirtualHostImpl getVirtualHost()
@@ -525,35 +365,6 @@ public class Connection_1_0 implements C
}
- @Override
- public void addSessionListener(final SessionModelListener listener)
- {
- _sessionListeners.add(listener);
- }
-
- @Override
- public void removeSessionListener(final SessionModelListener listener)
- {
- _sessionListeners.remove(listener);
- }
-
- private void sessionAdded(final AMQSessionModel<?,?> session)
- {
- for(SessionModelListener l : _sessionListeners)
- {
- l.sessionAdded(session);
- }
- }
-
- private void sessionRemoved(final AMQSessionModel<?,?> session)
- {
- for(SessionModelListener l : _sessionListeners)
- {
- l.sessionRemoved(session);
- }
- }
-
-
public void transportStateChanged()
{
for (Session_1_0 session : _sessions)
@@ -562,25 +373,18 @@ public class Connection_1_0 implements C
}
}
- @Override
public void notifyWork()
{
- _protocolEngine.notifyWork();
- }
-
- @Override
- public boolean isMessageAssignmentSuspended()
- {
- return _protocolEngine.isMessageAssignmentSuspended();
+ _amqpConnection.notifyWork();
}
public void processPending()
{
- List<? extends AMQSessionModel<?,?>> sessionsWithPending = new
ArrayList<>(getSessionModels());
+ List<? extends AMQSessionModel<?>> sessionsWithPending = new
ArrayList<>(getSessionModels());
while(!sessionsWithPending.isEmpty())
{
- final Iterator<? extends AMQSessionModel<?, ?>> iter =
sessionsWithPending.iterator();
- AMQSessionModel<?, ?> session;
+ final Iterator<? extends AMQSessionModel<?>> iter =
sessionsWithPending.iterator();
+ AMQSessionModel<?> session;
while(iter.hasNext())
{
session = iter.next();
@@ -605,7 +409,7 @@ public class Connection_1_0 implements C
return "Connection_1_0["
+ _connectionId
+ " "
- + _protocolEngine.getRemoteAddress().toString()
+ + _amqpConnection.getAddress()
+ (_vhost == null ? "" : (" vh : " + _vhost.getName()))
+ ']';
}
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
Sat Jun 27 21:13:25 2015
@@ -92,7 +92,7 @@ class ConsumerTarget_1_0 extends Abstrac
@Override
public boolean doIsSuspended()
{
- return _link.getSession().getConnectionModel().isStopped() ||
getState() != State.ACTIVE;
+ return _link.getSession().getAMQPConnection().isConnectionStopped() ||
getState() != State.ACTIVE;
}
@@ -264,7 +264,7 @@ class ConsumerTarget_1_0 extends Abstrac
}
}
-
getSession().getConnectionModel().registerMessageDelivered(message.getSize());
+
getSession().getAMQPConnection().registerMessageDelivered(message.getSize());
getEndpoint().transfer(transfer);
}
else
@@ -297,7 +297,7 @@ class ConsumerTarget_1_0 extends Abstrac
synchronized (_link.getLock())
{
- ProtocolEngine protocolEngine =
getSession().getConnection().getProtocolEngine();
+ ProtocolEngine protocolEngine =
getSession().getConnection().getAmqpConnection();
final boolean hasCredit = _link.isAttached() &&
getEndpoint().hasCreditToSend() &&
!protocolEngine.isTransportBlockedForWriting();
if(!hasCredit && getState() == State.ACTIVE)
{
@@ -335,7 +335,7 @@ class ConsumerTarget_1_0 extends Abstrac
{
synchronized(_link.getLock())
{
- ProtocolEngine protocolEngine =
getSession().getConnection().getProtocolEngine();
+ ProtocolEngine protocolEngine =
getSession().getConnection().getAmqpConnection();
if(isSuspended() && getEndpoint() != null &&
!protocolEngine.isTransportBlockedForWriting())
{
updateState(State.SUSPENDED, State.ACTIVE);
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java
Sat Jun 27 21:13:25 2015
@@ -89,7 +89,7 @@ public class ProtocolEngineCreator_1_0_0
if(supportedMechanisms.contains(AnonymousAuthenticationManager.MECHANISM_NAME)
||
(supportedMechanisms.contains(ExternalAuthenticationManagerImpl.MECHANISM_NAME)
&& network.getPeerPrincipal() != null))
{
- return new ProtocolEngine_1_0_0(network, broker, id, port,
transport, aggregateTicker, false);
+ return new AMQPConnection_1_0(broker, network, port, transport,
id, aggregateTicker, false);
}
else
{
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
Sat Jun 27 21:13:25 2015
@@ -65,7 +65,7 @@ public class ProtocolEngineCreator_1_0_0
Transport transport,
long id, final AggregateTicker
aggregateTicker)
{
- return new ProtocolEngine_1_0_0(network, broker, id, port, transport,
aggregateTicker, true);
+ return new AMQPConnection_1_0(broker, network, port, transport, id,
aggregateTicker, true);
}
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
Sat Jun 27 21:13:25 2015
@@ -215,7 +215,7 @@ public class ReceivingLink_1_0 implement
getEndpoint().updateDisposition(deliveryTag, resultantState,
settled);
-
getSession().getConnectionModel().registerMessageReceived(message.getSize(),
message.getArrivalTime());
+
getSession().getAMQPConnection().registerMessageReceived(message.getSize(),
message.getArrivalTime());
if(!(transaction instanceof AutoCommitTransaction))
{
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
Sat Jun 27 21:13:25 2015
@@ -85,6 +85,7 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.protocol.ConsumerListener;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
@@ -93,7 +94,7 @@ import org.apache.qpid.server.virtualhos
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.network.Ticker;
-public class Session_1_0 implements SessionEventListener,
AMQSessionModel<Session_1_0, Connection_1_0>, LogSubject
+public class Session_1_0 implements SessionEventListener,
AMQSessionModel<Session_1_0>, LogSubject
{
private static final Logger _logger =
LoggerFactory.getLogger(Session_1_0.class);
private static final Symbol LIFETIME_POLICY =
Symbol.valueOf("lifetime-policy");
@@ -573,9 +574,9 @@ public class Session_1_0 implements Sess
}
@Override
- public Connection_1_0 getConnectionModel()
+ public AMQPConnection<?> getAMQPConnection()
{
- return _connection;
+ return _connection.getAmqpConnection();
}
@Override
@@ -737,9 +738,9 @@ public class Session_1_0 implements Sess
public String toLogString()
{
- long connectionId = getConnectionModel().getConnectionId();
+ long connectionId = getAMQPConnection().getConnectionId();
- String remoteAddress = getConnectionModel().getRemoteAddressString();
+ String remoteAddress = getAMQPConnection().getRemoteAddressString();
return "[" +
MessageFormat.format(CHANNEL_FORMAT,
@@ -943,13 +944,13 @@ public class Session_1_0 implements Sess
@Override
public void addTicker(final Ticker ticker)
{
-
getConnection().getProtocolEngine().getAggregateTicker().addTicker(ticker);
+
getConnection().getAmqpConnection().getAggregateTicker().addTicker(ticker);
}
@Override
public void removeTicker(final Ticker ticker)
{
-
getConnection().getProtocolEngine().getAggregateTicker().removeTicker(ticker);
+
getConnection().getAmqpConnection().getAggregateTicker().removeTicker(ticker);
}
private void consumerAdded(Consumer<?> consumer)
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
Sat Jun 27 21:13:25 2015
@@ -41,6 +41,7 @@ import java.util.UUID;
import javax.security.auth.Subject;
import org.apache.qpid.server.model.*;
+import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.virtualhost.VirtualHostPrincipal;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
@@ -70,7 +71,7 @@ import org.apache.qpid.transport.network
public class ProtocolEngine_1_0_0Test extends QpidTestCase
{
- private ProtocolEngine_1_0_0 _protocolEngine_1_0_0;
+ private AMQPConnection_1_0 _protocolEngine_1_0_0;
private NetworkConnection _networkConnection;
private Broker<?> _broker;
private AmqpPort _port;
@@ -78,7 +79,7 @@ public class ProtocolEngine_1_0_0Test ex
private AuthenticationProvider _authenticationProvider;
private List<ByteBuffer> _sentBuffers;
private FrameWriter _frameWriter;
- private Connection _connection;
+ private AMQPConnection _connection;
private VirtualHostImpl _virtualHost;
@Override
@@ -103,7 +104,7 @@ public class ProtocolEngine_1_0_0Test ex
_virtualHost = mock(VirtualHostImpl.class);
when(_virtualHost.getChildExecutor()).thenReturn(taskExecutor);
when(_virtualHost.getModel()).thenReturn(BrokerModel.getInstance());
- final ArgumentCaptor<Connection> connectionCaptor =
ArgumentCaptor.forClass(Connection.class);
+ final ArgumentCaptor<AMQPConnection> connectionCaptor =
ArgumentCaptor.forClass(AMQPConnection.class);
doAnswer(new Answer()
{
@Override
@@ -169,7 +170,7 @@ public class ProtocolEngine_1_0_0Test ex
buf.flip();
_protocolEngine_1_0_0.received(buf);
- verify(_virtualHost).registerConnection(any(Connection.class));
+ verify(_virtualHost).registerConnection(any(AMQPConnection.class));
AuthenticatedPrincipal principal = (AuthenticatedPrincipal)
_connection.getUnderlyingConnection().getAuthorizedPrincipal();
assertNotNull(principal);
assertEquals(principal, new
AuthenticatedPrincipal(AnonymousAuthenticationManager.ANONYMOUS_PRINCIPAL));
@@ -192,7 +193,7 @@ public class ProtocolEngine_1_0_0Test ex
buf.flip();
_protocolEngine_1_0_0.received(buf);
- verify(_virtualHost,
never()).registerConnection(any(Connection.class));
+ verify(_virtualHost,
never()).registerConnection(any(AMQPConnection.class));
verify(_networkConnection).close();
}
@@ -222,7 +223,7 @@ public class ProtocolEngine_1_0_0Test ex
buf.flip();
_protocolEngine_1_0_0.received(buf);
- verify(_virtualHost).registerConnection(any(Connection.class));
+ verify(_virtualHost).registerConnection(any(AMQPConnection.class));
AuthenticatedPrincipal authPrincipal = (AuthenticatedPrincipal)
_connection.getUnderlyingConnection().getAuthorizedPrincipal();
assertNotNull(authPrincipal);
assertEquals(authPrincipal, new AuthenticatedPrincipal(principal));
@@ -264,7 +265,7 @@ public class ProtocolEngine_1_0_0Test ex
buf.flip();
_protocolEngine_1_0_0.received(buf);
- verify(_virtualHost).registerConnection(any(Connection.class));
+ verify(_virtualHost).registerConnection(any(AMQPConnection.class));
AuthenticatedPrincipal principal = (AuthenticatedPrincipal)
_connection.getUnderlyingConnection().getAuthorizedPrincipal();
assertNotNull(principal);
assertEquals(principal, new
AuthenticatedPrincipal(AnonymousAuthenticationManager.ANONYMOUS_PRINCIPAL));
@@ -273,8 +274,8 @@ public class ProtocolEngine_1_0_0Test ex
private void createEngine(final boolean useSASL, Transport transport)
{
- _protocolEngine_1_0_0 = new ProtocolEngine_1_0_0(_networkConnection,
- _broker, 1, _port,
transport, new AggregateTicker(),
+ _protocolEngine_1_0_0 = new AMQPConnection_1_0(_broker,
_networkConnection,
+ _port, transport, 1,
new AggregateTicker(),
useSASL);
}
Modified:
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
(original)
+++
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
Sat Jun 27 21:13:25 2015
@@ -993,7 +993,7 @@ class ManagementNode implements MessageS
}
@Override
- public boolean verifySessionAccess(final AMQSessionModel<?, ?> session)
+ public boolean verifySessionAccess(final AMQSessionModel<?> session)
{
return true;
}
Modified:
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java
(original)
+++
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java
Sat Jun 27 21:13:25 2015
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
+import java.lang.reflect.WildcardType;
import java.security.AccessControlException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -665,6 +666,14 @@ public class RestServlet extends Abstrac
return getRawType(bounds[0]);
}
}
+ else if(t instanceof WildcardType)
+ {
+ Type[] upperBounds = ((WildcardType)t).getUpperBounds();
+ if(upperBounds.length == 1)
+ {
+ return getRawType(upperBounds[0]);
+ }
+ }
throw new ServerScopedRuntimeException("Unable to process type when
constructing configuration model: " + t);
}
Modified:
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
(original)
+++
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
Sat Jun 27 21:13:25 2015
@@ -45,8 +45,7 @@ import org.eclipse.jetty.util.ssl.SslCon
import org.eclipse.jetty.websocket.WebSocket;
import org.eclipse.jetty.websocket.WebSocketHandler;
-import org.apache.qpid.server.transport.ProtocolEngine;
-import org.apache.qpid.server.transport.ProtocolEngineFactory;
+import org.apache.qpid.server.transport.MultiVersionProtocolEngine;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
@@ -67,7 +66,7 @@ class WebSocketProvider implements Accep
private final AmqpPort<?> _port;
private final Set<Protocol> _supported;
private final Protocol _defaultSupportedProtocolReply;
- private final ProtocolEngineFactory _factory;
+ private final MultiVersionProtocolEngineFactory _factory;
private Server _server;
WebSocketProvider(final Transport transport,
@@ -202,7 +201,7 @@ class WebSocketProvider implements Accep
private final Certificate _userCertificate;
private Connection _connection;
private final Transport _transport;
- private ProtocolEngine _engine;
+ private MultiVersionProtocolEngine _engine;
private AmqpWebSocket(final Transport transport,
final SocketAddress localAddress,
@@ -231,7 +230,7 @@ class WebSocketProvider implements Accep
final ConnectionWrapper connectionWrapper =
new ConnectionWrapper(connection, _localAddress,
_remoteAddress);
connectionWrapper.setPeerCertificate(_userCertificate);
- _engine.setNetworkConnection(connectionWrapper,
connectionWrapper.getSender());
+ _engine.setNetworkConnection(connectionWrapper);
}
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
Sat Jun 27 21:13:25 2015
@@ -227,7 +227,7 @@ public class AMQConnectionDelegate_0_10
_conn.setUsername(_qpidConnection.getUserID());
_conn.setMaximumChannelCount(_qpidConnection.getChannelMax());
_conn.getFailoverPolicy().attainedConnection();
- _conn.logConnected(_qpidConnection.getLocalAddress(),
_qpidConnection.getRemoteAddress());
+ _conn.logConnected(_qpidConnection.getLocalAddress(),
_qpidConnection.getRemoteSocketAddress());
_conn.setConnectionSettings(conSettings);
}
catch (ProtocolVersionException pe)
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
Sat Jun 27 21:13:25 2015
@@ -65,7 +65,6 @@ import org.apache.qpid.protocol.AMQConst
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java
Sat Jun 27 21:13:25 2015
@@ -736,7 +736,7 @@ public class Connection extends Connecti
}
}
- public SocketAddress getRemoteAddress()
+ public SocketAddress getRemoteSocketAddress()
{
return _remoteAddress;
}
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
Sat Jun 27 21:13:25 2015
@@ -29,7 +29,6 @@ import static org.apache.qpid.transport.
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
-import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.Constant;
import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
import org.apache.qpid.transport.FrameSizeObserver;
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
Sat Jun 27 21:13:25 2015
@@ -31,7 +31,6 @@ import javax.net.ssl.SSLSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
import org.apache.qpid.transport.network.NetworkConnection;
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
Sat Jun 27 21:13:25 2015
@@ -31,7 +31,6 @@ import java.util.concurrent.atomic.Atomi
import javax.net.ssl.SSLSocket;
import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.Ticker;
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
Sat Jun 27 21:13:25 2015
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.transport.network.security;
-import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
Sat Jun 27 21:13:25 2015
@@ -26,7 +26,6 @@ import javax.net.ssl.SSLEngine;
import javax.net.ssl.TrustManager;
import org.apache.qpid.ssl.SSLContextFactory;
-import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
Sat Jun 27 21:13:25 2015
@@ -25,7 +25,6 @@ import java.nio.ByteBuffer;
import javax.security.sasl.SaslException;
-import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.util.Logger;
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
Sat Jun 27 21:13:25 2015
@@ -28,7 +28,6 @@ import javax.net.ssl.SSLEngineResult.Han
import javax.net.ssl.SSLEngineResult.Status;
import javax.net.ssl.SSLException;
-import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.security.SSLStatus;
Modified:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
---
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
(original)
+++
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
Sat Jun 27 21:13:25 2015
@@ -20,9 +20,7 @@
*/
package org.apache.qpid.server.protocol;
-import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.net.InetSocketAddress;
@@ -34,12 +32,8 @@ import java.util.EnumSet;
import java.util.Iterator;
import java.util.Set;
-import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.model.*;
-import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.transport.MultiVersionProtocolEngineFactory;
-import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.transport.ByteBufferSender;
@@ -126,79 +120,6 @@ public class MultiVersionProtocolEngineF
};
- private byte[] getAmqpHeader(final Protocol version)
- {
- switch(version)
- {
- case AMQP_0_8:
- return AMQP_0_8_HEADER;
- case AMQP_0_9:
- return AMQP_0_9_HEADER;
- case AMQP_0_9_1:
- return AMQP_0_9_1_HEADER;
- case AMQP_0_10:
- return AMQP_0_10_HEADER;
- case AMQP_1_0:
- return AMQP_1_0_0_HEADER;
- default:
- fail("unknown AMQP version, appropriate header must be added
for new protocol version");
- return null;
- }
- }
-
- /**
- * Test to verify that connections established using a
MultiVersionProtocolEngine are assigned
- * IDs from a common sequence, independent of the protocol version under
use.
- */
- public void testDifferentProtocolVersionsShareCommonIDNumberingSequence()
- {
- Set<Protocol> protocols = getAllAMQPProtocols();
-
- SubjectCreator subjectCreator = mock(SubjectCreator.class);
-
- AuthenticationProvider<?> authProvider =
mock(AuthenticationProvider.class);
- when(authProvider.getSubjectCreator(false)).thenReturn(subjectCreator);
-
- AmqpPort port = mock(AmqpPort.class);
-
when(port.canAcceptNewConnection(any(SocketAddress.class))).thenReturn(true);
- when(port.getContextValue(eq(Integer.class),
eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(AmqpPort.DEFAULT_MAX_MESSAGE_SIZE);
- when(port.getAuthenticationProvider()).thenReturn(authProvider);
-
- TaskExecutor childExecutor = _broker.getChildExecutor();
- when(port.getChildExecutor()).thenReturn(childExecutor);
- when(port.getCategoryClass()).thenReturn(Port.class);
- when(port.getModel()).thenReturn(BrokerModel.getInstance());
-
- when(port.getContextValue(eq(Long.class),
eq(Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))).thenReturn(10000l);
- MultiVersionProtocolEngineFactory factory =
- new MultiVersionProtocolEngineFactory(_broker, protocols, null,
port,
- org.apache.qpid.server.model.Transport.TCP);
-
- //create a dummy to retrieve the 'current' ID number
- long previousId =
factory.newProtocolEngine(mock(SocketAddress.class)).getConnectionId();
-
- //create a protocol engine and send the AMQP header for all supported
AMQP verisons,
- //ensuring the ID assigned increases as expected
- for(Protocol protocol : protocols)
- {
- long expectedID = previousId + 1;
- byte[] header = getAmqpHeader(protocol);
- assertNotNull("protocol header should not be null", header);
-
- ProtocolEngine engine = factory.newProtocolEngine(null);
- TestNetworkConnection conn = new TestNetworkConnection();
- engine.setNetworkConnection(conn, conn.getSender());
- assertEquals("ID did not increment as expected", expectedID,
engine.getConnectionId());
-
- //actually feed in the AMQP header for this protocol version, and
ensure the ID remains consistent
- engine.received(ByteBuffer.wrap(header));
- assertEquals("ID was not as expected following receipt of the AMQP
version header", expectedID, engine.getConnectionId());
-
- previousId = expectedID;
- engine.closed();
- }
- }
-
protected Set<Protocol> getAllAMQPProtocols()
{
Set<Protocol> protocols = EnumSet.allOf(Protocol.class);
Modified:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
---
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
(original)
+++
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
Sat Jun 27 21:13:25 2015
@@ -59,7 +59,6 @@ import org.apache.qpid.server.model.Stat
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
@@ -69,6 +68,7 @@ import org.apache.qpid.server.queue.Last
import org.apache.qpid.server.queue.PriorityQueue;
import org.apache.qpid.server.queue.PriorityQueueImpl;
import org.apache.qpid.server.queue.StandardQueueImpl;
+import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.BrokerTestHelper;
@@ -676,8 +676,8 @@ public class VirtualHostMessageStoreTest
queueArguments.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT);
queueArguments.put(Queue.EXCLUSIVE, exclusive ?
ExclusivityPolicy.CONTAINER : ExclusivityPolicy.NONE);
AMQSessionModel sessionModel = mock(AMQSessionModel.class);
- AMQConnectionModel connectionModel = mock(AMQConnectionModel.class);
- when(sessionModel.getConnectionModel()).thenReturn(connectionModel);
+ AMQPConnection connectionModel = mock(AMQPConnection.class);
+ when(sessionModel.getAMQPConnection()).thenReturn(connectionModel);
when(connectionModel.getRemoteContainerName()).thenReturn(queueOwner);
SessionPrincipal principal = new SessionPrincipal(sessionModel);
AMQQueue<?> queue = Subject.doAs(new Subject(true,
Modified:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostLoggerRestTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostLoggerRestTest.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
---
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostLoggerRestTest.java
(original)
+++
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostLoggerRestTest.java
Sat Jun 27 21:13:25 2015
@@ -29,7 +29,6 @@ import java.util.Map;
import javax.servlet.http.HttpServletResponse;
import org.apache.qpid.server.logging.VirtualHostFileLogger;
-import org.apache.qpid.server.logging.messages.QueueMessages;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.VirtualHostLogger;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]