Author: kwall
Date: Sun Jul 26 19:39:00 2015
New Revision: 1692750
URL: http://svn.apache.org/r1692750
Log:
QPID-6655: [Java Broker] Refactor slow authentication detection to use a ticker
* Also added system tests around connection negotiation
* Prevented a decoding exception on the 0-10 path leaving open a connection
Added:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1692750&r1=1692749&r2=1692750&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
Sun Jul 26 19:39:00 2015
@@ -36,12 +36,17 @@ import javax.security.auth.Subject;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.connection.ConnectionPrincipal;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.StateTransition;
@@ -54,12 +59,15 @@ import org.apache.qpid.server.util.Actio
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.network.AggregateTicker;
import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.Ticker;
public abstract class AbstractAMQPConnection<C extends
AbstractAMQPConnection<C>>
extends AbstractConfiguredObject<C>
implements ProtocolEngine, AMQPConnection<C>
{
+ private static final Logger _logger =
LoggerFactory.getLogger(AbstractAMQPConnection.class);
+
private final Broker<?> _broker;
private final NetworkConnection _network;
private final AmqpPort<?> _port;
@@ -80,6 +88,8 @@ public abstract class AbstractAMQPConnec
private final SettableFuture<Void> _transportClosedFuture =
SettableFuture.create();
private final SettableFuture<Void> _modelClosedFuture =
SettableFuture.create();
private final AtomicBoolean _modelClosing = new AtomicBoolean();
+ private volatile long _lastReadTime;
+ private volatile long _lastWriteTime;
public AbstractAMQPConnection(Broker<?> broker,
NetworkConnection network,
@@ -132,6 +142,16 @@ public abstract class AbstractAMQPConnec
return attributes;
}
+ @Override
+ protected void onOpen()
+ {
+ super.onOpen();
+ long maxAuthDelay = _port.getContextValue(Long.class,
Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY);
+ _aggregateTicker.addTicker(new SlowConnectionOpenTicker(maxAuthDelay));
+ _lastReadTime = _lastWriteTime = getCreatedTime();
+
+ }
+
public final Broker<?> getBroker()
{
return _broker;
@@ -158,11 +178,33 @@ public abstract class AbstractAMQPConnec
return _aggregateTicker;
}
- public long getLastIoTime()
+ public final long getLastIoTime()
{
return Math.max(getLastReadTime(), getLastWriteTime());
}
+ @Override
+ public final long getLastReadTime()
+ {
+ return _lastReadTime;
+ }
+
+ public final void updateLastReadTime()
+ {
+ _lastReadTime = System.currentTimeMillis();
+ }
+
+ @Override
+ public final long getLastWriteTime()
+ {
+ return _lastWriteTime;
+ }
+
+ public final void updateLastWriteTime()
+ {
+ _lastWriteTime = System.currentTimeMillis();
+ }
+
public final long getConnectionId()
{
return _connectionId;
@@ -352,8 +394,6 @@ public abstract class AbstractAMQPConnec
getVirtualHost().registerConnection(this);
}
-
-
@Override
public boolean isIncoming()
{
@@ -413,11 +453,6 @@ public abstract class AbstractAMQPConnec
}
@Override
- protected void onClose()
- {
- }
-
- @Override
public <C extends ConfiguredObject> ListenableFuture<C>
addChildAsync(Class<C> childClass, Map<String, Object> attributes,
ConfiguredObject... otherParents)
{
if(childClass == Session.class)
@@ -473,4 +508,45 @@ public abstract class AbstractAMQPConnec
{
_transportClosedFuture.set(null);
}
+
+ protected abstract EventLogger getEventLogger();
+
+ private class SlowConnectionOpenTicker implements Ticker
+ {
+ private final long _allowedTime;
+
+ public SlowConnectionOpenTicker(long timeoutTime)
+ {
+ _allowedTime = timeoutTime;
+ }
+
+ @Override
+ public int getTimeToNextTick(final long currentTime)
+ {
+ final int timeToNextTick = (int) (getCreatedTime() + _allowedTime
- currentTime);
+ return timeToNextTick;
+ }
+
+ @Override
+ public int tick(final long currentTime)
+ {
+ int nextTick = getTimeToNextTick(currentTime);
+ if(nextTick <= 0)
+ {
+ if (getAuthorizedPrincipal() == null)
+ {
+ _logger.warn("Connection has taken more than {} ms to
establish identity. Closing as possible DoS.",
+ _allowedTime);
+ getEventLogger().message(ConnectionMessages.IDLE_CLOSE());
+ _network.close();
+ }
+ else
+ {
+ _aggregateTicker.removeTicker(this);
+ }
+ }
+ return nextTick;
+ }
+ }
+
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1692750&r1=1692749&r2=1692750&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
Sun Jul 26 19:39:00 2015
@@ -421,11 +421,12 @@ public class MultiVersionProtocolEngine
//delegate. Also save most recent supported version and
associated reply header bytes
for(int i = 0; newDelegate == null && i < _creators.length;
i++)
{
- if(_supported.contains(_creators[i].getVersion()))
+ final ProtocolEngineCreator creator = _creators[i];
+ if(_supported.contains(creator.getVersion()))
{
- supportedReplyBytes =
_creators[i].getHeaderIdentifier();
- supportedReplyVersion = _creators[i].getVersion();
- byte[] compareBytes =
_creators[i].getHeaderIdentifier();
+ supportedReplyBytes = creator.getHeaderIdentifier();
+ supportedReplyVersion = creator.getVersion();
+ byte[] compareBytes = creator.getHeaderIdentifier();
boolean equal = true;
for(int j = 0; equal && j<compareBytes.length; j++)
{
@@ -433,21 +434,21 @@ public class MultiVersionProtocolEngine
}
if(equal)
{
- newDelegate =
_creators[i].newProtocolEngine(_broker,
-
_network, _port, _transport, _id,
-
_aggregateTicker);
- if(newDelegate == null &&
_creators[i].getSuggestedAlternativeHeader() != null)
+ newDelegate = creator.newProtocolEngine(_broker,
+ _network,
_port, _transport, _id,
+
_aggregateTicker);
+ if(newDelegate == null &&
creator.getSuggestedAlternativeHeader() != null)
{
- defaultSupportedReplyBytes =
_creators[i].getSuggestedAlternativeHeader();
+ defaultSupportedReplyBytes =
creator.getSuggestedAlternativeHeader();
}
}
}
//If there is a configured default reply to an unsupported
version initiation,
//then save the associated reply header bytes when we
encounter them
- if(defaultSupportedReplyBytes == null &&
_defaultSupportedReply != null && _creators[i].getVersion() ==
_defaultSupportedReply)
+ if(defaultSupportedReplyBytes == null &&
_defaultSupportedReply != null && creator.getVersion() ==
_defaultSupportedReply)
{
- defaultSupportedReplyBytes =
_creators[i].getHeaderIdentifier();
+ defaultSupportedReplyBytes =
creator.getHeaderIdentifier();
}
}
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1692750&r1=1692749&r2=1692750&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
Sun Jul 26 19:39:00 2015
@@ -35,20 +35,23 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.security.SubjectCreator;
+import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Consumer;
-import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.ConnectionDelegate;
import org.apache.qpid.transport.Constant;
@@ -64,11 +67,8 @@ public class AMQPConnection_0_10 extends
private final NetworkConnection _network;
- private ServerConnection _connection;
+ private final ServerConnection _connection;
- private long _createTime = System.currentTimeMillis();
- private volatile long _lastReadTime = _createTime;
- private volatile long _lastWriteTime = _createTime;
private volatile boolean _transportBlockedForWriting;
private final AtomicReference<Thread> _messageAssignmentSuspended = new
AtomicReference<>();
@@ -164,7 +164,7 @@ public class AMQPConnection_0_10 extends
@Override
public void send(ByteBuffer msg)
{
- _lastWriteTime = System.currentTimeMillis();
+ updateLastWriteTime();
sender.send(msg);
}
@@ -183,18 +183,6 @@ public class AMQPConnection_0_10 extends
};
}
- @Override
- public long getLastReadTime()
- {
- return _lastReadTime;
- }
-
- @Override
- public long getLastWriteTime()
- {
- return _lastWriteTime;
- }
-
public void received(final ByteBuffer buf)
{
Subject.doAs(_connection.getAuthorizedSubject(), new
PrivilegedAction<Object>()
@@ -202,22 +190,28 @@ public class AMQPConnection_0_10 extends
@Override
public Object run()
{
- _lastReadTime = System.currentTimeMillis();
- if (_connection.getAuthorizedPrincipal() == null &&
- (_lastReadTime - _createTime) >
_connection.getPort().getContextValue(Long.class,
-
Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))
+ updateLastReadTime();
+ try
{
-
- _logger.warn("Connection has taken more than "
- + _connection.getPort()
- .getContextValue(Long.class,
Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)
- + "ms to establish identity. Closing as
possible DoS.");
-
_connection.getEventLogger().message(ConnectionMessages.IDLE_CLOSE());
- _network.close();
-
+ _inputHandler.received(buf);
+ _connection.receivedComplete();
+ }
+ catch (IllegalArgumentException | IllegalStateException e)
+ {
+ throw new ConnectionScopedRuntimeException(e);
}
- _inputHandler.received(buf);
- _connection.receivedComplete();
+ catch (StoreException e)
+ {
+ if (getVirtualHost().getState() == State.ACTIVE)
+ {
+ throw new ServerScopedRuntimeException(e);
+ }
+ else
+ {
+ throw new ConnectionScopedRuntimeException(e);
+ }
+ }
+
return null;
}
});
@@ -337,7 +331,7 @@ public class AMQPConnection_0_10 extends
public void closeSessionAsync(final AMQSessionModel<?> session,
final AMQConstant cause, final String
message)
{
- _connection.closeSessionAsync((ServerSession)session, cause, message);
+ _connection.closeSessionAsync((ServerSession) session, cause, message);
}
public void block()
@@ -365,14 +359,15 @@ public class AMQPConnection_0_10 extends
_connection.unblock();
}
- public LogSubject getLogSubject()
+ public long getSessionCountLimit()
{
- return _connection.getLogSubject();
+ return _connection.getSessionCountLimit();
}
- public long getSessionCountLimit()
+ @Override
+ protected EventLogger getEventLogger()
{
- return _connection.getSessionCountLimit();
+ return _connection.getEventLogger();
}
}
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1692750&r1=1692749&r2=1692750&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
Sun Jul 26 19:39:00 2015
@@ -332,7 +332,7 @@ public class AMQChannel
@Override
public long getActivityTime()
{
- return _connection.getLastReceivedTime();
+ return _connection.getLastReadTime();
}
});
_txnStarts.incrementAndGet();
@@ -441,7 +441,7 @@ public class AMQChannel
final MessageMetaData messageMetaData =
new MessageMetaData(messagePublishInfo,
contentHeader,
-
getConnection().getLastReceivedTime());
+ getConnection().getLastReadTime());
final MessageHandle<MessageMetaData> handle =
_messageStore.addMessage(messageMetaData);
int bodyCount = _currentMessage.getBodyCount();
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1692750&r1=1692749&r2=1692750&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
Sun Jul 26 19:39:00 2015
@@ -41,7 +41,6 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -74,7 +73,6 @@ import org.apache.qpid.server.message.In
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Consumer;
-import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
@@ -88,8 +86,6 @@ import org.apache.qpid.server.util.Conne
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.ByteBufferSender;
-import org.apache.qpid.transport.SenderClosedException;
-import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.AggregateTicker;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -119,7 +115,6 @@ public class AMQPConnection_0_8
private static final long CLOSE_OK_TIMEOUT = 10000l;
- private final long _creationTime;
private final AtomicBoolean _stateChanged = new AtomicBoolean();
private final AtomicReference<Action<ProtocolEngine>> _workListener = new
AtomicReference<>();
@@ -139,7 +134,7 @@ public class AMQPConnection_0_8
*/
private final Set<AMQChannel> _channelsForCurrentMessage = new HashSet<>();
- private AMQDecoder _decoder;
+ private final AMQDecoder _decoder;
private SaslServer _saslServer;
@@ -147,16 +142,14 @@ public class AMQPConnection_0_8
private ProtocolVersion _protocolVersion =
ProtocolVersion.getLatestSupportedVersion();
private final MethodRegistry _methodRegistry = new
MethodRegistry(_protocolVersion);
- private final List<Action<? super AMQPConnection_0_8>>
_connectionCloseTaskList =
- new CopyOnWriteArrayList<>();
private final Queue<Action<? super AMQPConnection_0_8>> _asyncTaskList =
new ConcurrentLinkedQueue<>();
- private Map<Integer, Long> _closingChannelsList = new
ConcurrentHashMap<>();
+ private final Map<Integer, Long> _closingChannelsList = new
ConcurrentHashMap<>();
private ProtocolOutputConverter _protocolOutputConverter;
- private Object _reference = new Object();
+ private final Object _reference = new Object();
private LogSubject _logSubject;
@@ -167,12 +160,9 @@ public class AMQPConnection_0_8
private final ByteBufferSender _sender;
private volatile boolean _deferFlush;
- private volatile long _lastReceivedTime = System.currentTimeMillis();
- private volatile long _lastWriteTime = System.currentTimeMillis();
private boolean _blocking;
private volatile boolean _closeWhenNoRoute;
- private boolean _authenticated;
private boolean _compressionSupported;
private int _messageCompressionThreshold;
private int _currentClassId;
@@ -215,9 +205,6 @@ public class AMQPConnection_0_8
}
});
_closeWhenNoRoute = getBroker().getConnection_closeWhenNoRoute();
-
- _creationTime = System.currentTimeMillis();
-
}
@Override
@@ -300,69 +287,27 @@ public class AMQPConnection_0_8
@Override
public Void run()
{
-
- final long arrivalTime = System.currentTimeMillis();
- if (!_authenticated &&
- (arrivalTime - _creationTime) >
getPort().getContextValue(Long.class,
-
Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))
- {
- _logger.warn("Connection has taken more than "
- + getPort().getContextValue(Long.class,
Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)
- + "ms to establish identity. Closing as
possible DoS.");
- getEventLogger().message(ConnectionMessages.IDLE_CLOSE());
- closeNetworkConnection();
- }
- _lastReceivedTime = arrivalTime;
+ updateLastReadTime();
try
{
_decoder.decodeBuffer(msg);
receivedCompleteAllChannels();
}
- catch (ConnectionScopedRuntimeException e)
+ catch (TransportException | AMQFrameDecodingException |
IOException e)
{
_logger.error("Unexpected exception", e);
- closeNetworkConnection();
- }
- catch (AMQProtocolVersionException e)
- {
- _logger.error("Unexpected protocol version", e);
- closeNetworkConnection();
- }
- catch (SenderClosedException e)
- {
- _logger.debug("Sender was closed abruptly, closing
network.", e);
- closeNetworkConnection();
- }
- catch (SenderException e)
- {
- _logger.info("Unexpected exception on send, closing
network.", e);
- closeNetworkConnection();
- }
- catch (TransportException e)
- {
- _logger.error("Unexpected transport exception", e);
- closeNetworkConnection();
- }
- catch (AMQFrameDecodingException e)
- {
- _logger.error("Frame decoding", e);
- closeNetworkConnection();
- }
- catch (IOException e)
- {
- _logger.error("I/O Exception", e);
- closeNetworkConnection();
+ throw new ConnectionScopedRuntimeException(e);
}
catch (StoreException e)
{
if (_virtualHost.getState() == State.ACTIVE)
{
- throw e;
+ throw new ServerScopedRuntimeException(e);
}
else
{
- _logger.error("Store Exception ignored as virtual host
no longer active", e);
+ throw new ConnectionScopedRuntimeException(e);
}
}
return null;
@@ -496,8 +441,7 @@ public class AMQPConnection_0_8
}
- final long time = System.currentTimeMillis();
- _lastWriteTime = time;
+ updateLastWriteTime();
if(!_deferFlush)
{
@@ -868,7 +812,6 @@ public class AMQPConnection_0_8
throw new IllegalArgumentException("authorizedSubject cannot be
null");
}
- _authenticated = true;
getSubject().getPrincipals().addAll(authorizedSubject.getPrincipals());
getSubject().getPrivateCredentials().addAll(authorizedSubject.getPrivateCredentials());
getSubject().getPublicCredentials().addAll(authorizedSubject.getPublicCredentials());
@@ -953,11 +896,6 @@ public class AMQPConnection_0_8
writeFrame(HeartbeatBody.FRAME);
}
- public long getLastReceivedTime()
- {
- return _lastReceivedTime;
- }
-
public long getSessionCountLimit()
{
return getMaximumNumberOfChannels();
@@ -1514,18 +1452,6 @@ public class AMQPConnection_0_8
return _reference;
}
- @Override
- public long getLastReadTime()
- {
- return _lastReceivedTime;
- }
-
- @Override
- public long getLastWriteTime()
- {
- return _lastWriteTime;
- }
-
public boolean isCloseWhenNoRoute()
{
return _closeWhenNoRoute;
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java?rev=1692750&r1=1692749&r2=1692750&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java
Sun Jul 26 19:39:00 2015
@@ -61,6 +61,7 @@ public class AMQProtocolEngineTest exten
when(_port.getChildExecutor()).thenReturn(childExecutor);
when(_port.getCategoryClass()).thenReturn(Port.class);
when(_port.getModel()).thenReturn(BrokerModel.getInstance());
+ when(_port.getContextValue(Long.class,
Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)).thenReturn(2500l);
_network = mock(NetworkConnection.class);
_transport = Transport.TCP;
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1692750&r1=1692749&r2=1692750&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
Sun Jul 26 19:39:00 2015
@@ -53,7 +53,7 @@ import org.apache.qpid.amqp_1_0.type.Sym
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.protocol.ConnectionClosingTicker;
import org.apache.qpid.server.transport.AbstractAMQPConnection;
@@ -72,6 +72,7 @@ import org.apache.qpid.server.security.a
import org.apache.qpid.server.transport.NetworkConnectionScheduler;
import org.apache.qpid.server.transport.NonBlockingConnection;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.AggregateTicker;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -82,11 +83,9 @@ public class AMQPConnection_1_0 extends
public static Logger LOGGER =
LoggerFactory.getLogger(AMQPConnection_1_0.class);
- public static final long CLOSE_REPONSE_TIMEOUT = 10000l;
+ public static final long CLOSE_RESPONSE_TIMEOUT = 10000l;
+ private final Broker<?> _broker;
- private volatile long _lastReadTime;
- private volatile long _lastWriteTime;
- private long _createTime = System.currentTimeMillis();
private ConnectionEndpoint _endpoint;
private final AtomicBoolean _stateChanged = new AtomicBoolean();
private final AtomicReference<Action<ProtocolEngine>> _workListener = new
AtomicReference<>();
@@ -154,6 +153,7 @@ public class AMQPConnection_1_0 extends
final boolean useSASL)
{
super(broker, network, port, transport, id, aggregateTicker);
+ _broker = broker;
_connection = createConnection(broker, network, port, transport, id,
useSASL);
_connection.setAmqpConnection(this);
@@ -346,7 +346,7 @@ public class AMQPConnection_1_0 extends
{
try
{
- _lastReadTime = System.currentTimeMillis();
+ updateLastReadTime();
if(RAW_LOGGER.isDebugEnabled())
{
ByteBuffer dup = msg.duplicate();
@@ -495,12 +495,6 @@ public class AMQPConnection_1_0 extends
}
- public long getCreateTime()
- {
- return _createTime;
- }
-
-
public boolean canSend()
{
return true;
@@ -519,7 +513,7 @@ public class AMQPConnection_1_0 extends
synchronized (_sendLock)
{
- _lastWriteTime = System.currentTimeMillis();
+ updateLastWriteTime();
if (FRAME_LOGGER.isDebugEnabled())
{
FRAME_LOGGER.debug("SEND["
@@ -573,21 +567,11 @@ public class AMQPConnection_1_0 extends
public void close()
{
- getAggregateTicker().addTicker(new
ConnectionClosingTicker(System.currentTimeMillis()+ CLOSE_REPONSE_TIMEOUT,
+ getAggregateTicker().addTicker(new
ConnectionClosingTicker(System.currentTimeMillis()+ CLOSE_RESPONSE_TIMEOUT,
getNetwork()));
}
- public long getLastReadTime()
- {
- return _lastReadTime;
- }
-
- public long getLastWriteTime()
- {
- return _lastWriteTime;
- }
-
@Override
public boolean isTransportBlockedForWriting()
{
@@ -684,14 +668,22 @@ public class AMQPConnection_1_0 extends
_connection.unblock();
}
- public LogSubject getLogSubject()
- {
- return _connection.getLogSubject();
- }
-
public long getSessionCountLimit()
{
return _connection.getSessionCountLimit();
}
+ @Override
+ protected EventLogger getEventLogger()
+ {
+ final VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ if (virtualHost != null)
+ {
+ return virtualHost.getEventLogger();
+ }
+ else
+ {
+ return _broker.getEventLogger();
+ }
+ }
}
Modified:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java?rev=1692750&r1=1692749&r2=1692750&view=diff
==============================================================================
---
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java
(original)
+++
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java
Sun Jul 26 19:39:00 2015
@@ -42,8 +42,6 @@ import javax.jms.Session;
/**
* Tests the behaviour of the client when the Broker terminates client
connection
* by the Broker being shutdown gracefully or otherwise.
- *
- * @see ManagedConnectionMBeanTest
*/
public class BrokerClosesClientConnectionTest extends QpidBrokerTestCase
{
Added:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java?rev=1692750&view=auto
==============================================================================
---
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
(added)
+++
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
Sun Jul 26 19:39:00 2015
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.qpid.framing.ByteArrayDataInput;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.protocol.v0_10.ServerDisassembler;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.transport.network.Frame;
+
+public class ProtocolNegotiationTest extends QpidBrokerTestCase
+{
+ private static final int SO_TIMEOUT = 5000;
+ public static final int AMQP_HEADER_LEN = 8;
+ private ProtocolVersion _expectedProtocolInit;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _expectedProtocolInit =
convertProtocolToProtocolVersion(getBrokerProtocol());
+ }
+
+ public void
testWrongProtocolHeaderSent_BrokerRespondsWithSupportedProtocol() throws
Exception
+ {
+ try(Socket socket = new Socket())
+ {
+ socket.setSoTimeout(SO_TIMEOUT);
+
+ final InetSocketAddress inetSocketAddress = new
InetSocketAddress("localhost", getPort());
+ _logger.debug("Making connection to {}", inetSocketAddress);
+
+ socket.connect(inetSocketAddress);
+
+ assertTrue("Expected socket to be connected",
socket.isConnected());
+
+ socket.getOutputStream().write("NOTANAMQPHEADER".getBytes());
+ byte[] receivedHeader = new byte[AMQP_HEADER_LEN];
+ int len = socket.getInputStream().read(receivedHeader);
+ assertEquals("Unexpected number of bytes available from socket",
receivedHeader.length, len);
+ assertEquals("Expected end-of-stream from socket signifying socket
closed)",
+ -1,
+ socket.getInputStream().read());
+
+ ProtocolInitiation protocolInitiation = new ProtocolInitiation(new
ByteArrayDataInput(receivedHeader));
+
+ assertEquals("Unexpected protocol initialisation",
_expectedProtocolInit, protocolInitiation.checkVersion());
+ }
+ }
+
+
+ public void testNoProtocolHeaderSent_BrokerClosesConnection() throws
Exception
+ {
+ try(Socket socket = new Socket())
+ {
+ socket.setSoTimeout(SO_TIMEOUT);
+
+ final InetSocketAddress inetSocketAddress = new
InetSocketAddress("localhost", getPort());
+ _logger.debug("Making connection to {}", inetSocketAddress);
+
+ socket.connect(inetSocketAddress);
+
+ assertTrue("Expected socket to be connected",
socket.isConnected());
+
+ int c = 0;
+ try
+ {
+ c = socket.getInputStream().read();
+ _logger.debug("Read {}", c);
+
+ }
+ catch(SocketTimeoutException ste)
+ {
+ fail("Broker did not close connection with no activity within
expected timeout");
+ }
+
+ assertEquals("Expected end-of-stream from socket signifying socket
closed)", -1, c);
+ }
+ }
+
+ public void testNoConnectionOpenSent_BrokerClosesConnection() throws
Exception
+ {
+ setSystemProperty(Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY,
"1000");
+
+ try(Socket socket = new Socket())
+ {
+ socket.setSoTimeout(5000);
+
+ final ProtocolVersion protocolVersion =
convertProtocolToProtocolVersion(getBrokerProtocol());
+ ProtocolInitiation pi = new ProtocolInitiation(protocolVersion);
+
+ final InetSocketAddress inetSocketAddress = new
InetSocketAddress("localhost", getPort());
+ _logger.debug("Making connection to {}", inetSocketAddress);
+
+ socket.connect(inetSocketAddress);
+
+ assertTrue("Expected socket to be connected",
socket.isConnected());
+
+ final DataOutputStream dataOutputStream = new
DataOutputStream(socket.getOutputStream());
+ final InputStream inputStream = socket.getInputStream();
+
+ // write header
+ pi.writePayload(dataOutputStream);
+
+ // reader header
+ byte[] receivedHeader = new byte[AMQP_HEADER_LEN];
+ int len = inputStream.read(receivedHeader);
+ assertEquals("Unexpected number of bytes available from socket",
receivedHeader.length, len);
+
+ // Send heartbeat frames to simulate a client that, although
active, fails to
+ // authenticate within the allowed period
+
+ long timeout = System.currentTimeMillis() + 3000;
+ boolean brokenPipe = false;
+ while(timeout > System.currentTimeMillis())
+ {
+ if (!writeHeartbeat(dataOutputStream));
+ {
+ brokenPipe = true;
+ break;
+ }
+ }
+ assertTrue("Expected pipe to become broken within "
+ + Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY + "
timeout", brokenPipe);
+ }
+ }
+
+ public void testIllegalFrameSent_BrokerClosesConnection() throws Exception
+ {
+ try(Socket socket = new Socket())
+ {
+ socket.setSoTimeout(5000);
+
+ final ProtocolVersion protocolVersion =
convertProtocolToProtocolVersion(getBrokerProtocol());
+ ProtocolInitiation pi = new ProtocolInitiation(protocolVersion);
+
+ final InetSocketAddress inetSocketAddress = new
InetSocketAddress("localhost", getPort());
+ _logger.debug("Making connection to {}", inetSocketAddress);
+
+ socket.connect(inetSocketAddress);
+
+ assertTrue("Expected socket to be connected",
socket.isConnected());
+
+ final DataOutputStream dataOutputStream = new
DataOutputStream(socket.getOutputStream());
+ final InputStream inputStream = socket.getInputStream();
+
+ // write header
+ pi.writePayload(dataOutputStream);
+
+ // reader header
+ byte[] receivedHeader = new byte[AMQP_HEADER_LEN];
+ int len = inputStream.read(receivedHeader);
+ assertEquals("Unexpected number of bytes available from socket",
receivedHeader.length, len);
+
+ dataOutputStream.write("NOTANAMPQFRAME".getBytes());
+
+
+ }
+ }
+
+ private boolean writeHeartbeat(final DataOutputStream dataOutputStream)
+ throws IOException
+ {
+ final AtomicBoolean success = new AtomicBoolean(true);
+ if (isBroker010())
+ {
+ ConnectionHeartbeat heartbeat = new ConnectionHeartbeat();
+ ServerDisassembler serverDisassembler = new ServerDisassembler(new
ByteBufferSender()
+ {
+ @Override
+ public void send(final ByteBuffer msg)
+ {
+ try
+ {
+ dataOutputStream.write(msg.array(), msg.position(),
msg.remaining());
+ }
+ catch (SocketException se)
+ {
+
+ success.set(false);
+ }
+ catch(IOException e)
+ {
+ throw new RuntimeException("Unexpected IOException",
e);
+ }
+ }
+
+ @Override
+ public void flush()
+ {
+ }
+
+ @Override
+ public void close()
+ {
+ }
+ }, Frame.HEADER_SIZE + 1);
+ serverDisassembler.command(null, heartbeat);
+ }
+ else
+ {
+ try
+ {
+ HeartbeatBody.FRAME.writePayload(dataOutputStream);
+ }
+ catch (SocketException se)
+ {
+ success.set(false);
+ }
+ }
+
+ return success.get();
+ }
+
+ private ProtocolVersion convertProtocolToProtocolVersion(final Protocol p)
+ {
+ final ProtocolVersion protocolVersion;
+ switch(p)
+ {
+ case AMQP_0_10:
+ protocolVersion = ProtocolVersion.v0_10;
+ break;
+ case AMQP_0_9_1:
+ protocolVersion = ProtocolVersion.v0_91;
+ break;
+ case AMQP_0_9:
+ protocolVersion = ProtocolVersion.v0_9;
+ break;
+ case AMQP_0_8:
+ protocolVersion = ProtocolVersion.v0_8;
+ break;
+ default:
+ throw new IllegalArgumentException("Unexpected " + p.name());
+ }
+ return protocolVersion;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]