Author: rgodfrey
Date: Thu Feb 18 09:41:53 2016
New Revision: 1731029
URL: http://svn.apache.org/viewvc?rev=1731029&view=rev
Log:
QPID-7079 : Add connection state logging to idle timeout
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1731029&r1=1731028&r2=1731029&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
Thu Feb 18 09:41:53 2016
@@ -201,7 +201,7 @@ public class AMQPConnection_0_10 extends
@Override
public Object run()
{
-
_connection.getEventLogger().message(ConnectionMessages.IDLE_CLOSE("", false));
+
_connection.getEventLogger().message(ConnectionMessages.IDLE_CLOSE("Current
connection state: " + _connection.getConnectionDelegate().getState(), false));
_network.close();
return null;
}
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1731029&r1=1731028&r2=1731029&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
Thu Feb 18 09:41:53 2016
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.configuration.CommonProperties;
import org.apache.qpid.properties.ConnectionStartProperties;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.State;
@@ -66,6 +67,19 @@ public class ServerConnectionDelegate ex
private boolean _compressionSupported;
+ enum ConnectionState
+ {
+ INIT,
+ AWAIT_START_OK,
+ AWAIT_SECURE_OK,
+ AWAIT_TUNE_OK,
+ AWAIT_OPEN,
+ OPEN
+ }
+
+ private ConnectionState _state = ConnectionState.INIT;
+
+
public ServerConnectionDelegate(Broker<?> broker, String localFQDN,
SubjectCreator subjectCreator)
{
this(createConnectionProperties(broker),
Collections.singletonList((Object)"en_US"), broker, localFQDN, subjectCreator);
@@ -86,6 +100,30 @@ public class ServerConnectionDelegate ex
_maximumFrameSize = Math.min(0xffff, broker.getNetworkBufferSize());
}
+
+ public final ConnectionState getState()
+ {
+ return _state;
+ }
+
+
+ private void assertState(final ServerConnection conn, final
ConnectionState requiredState)
+ {
+ if(_state != requiredState)
+ {
+ conn.sendConnectionClose(ConnectionCloseCode.FRAMING_ERROR,
"Command Invalid expected "+requiredState+" but was "+_state);
+ conn.closeAndIgnoreFutureInput();
+ }
+ }
+
+ @Override
+ public void init(final Connection conn, final ProtocolHeader hdr)
+ {
+ assertState((ServerConnection)conn, ConnectionState.INIT);
+ super.init(conn, hdr);
+ _state = ConnectionState.AWAIT_START_OK;
+ }
+
private static List<String> getFeatures(Broker<?> broker)
{
String brokerDisabledFeatures =
System.getProperty(BrokerProperties.PROPERTY_DISABLED_FEATURES);
@@ -134,6 +172,13 @@ public class ServerConnectionDelegate ex
}
+ @Override
+ public void connectionSecureOk(final Connection conn, final
ConnectionSecureOk ok)
+ {
+ assertState((ServerConnection)conn, ConnectionState.AWAIT_SECURE_OK);
+ super.connectionSecureOk(conn, ok);
+ }
+
protected void secure(final SaslServer ss, final Connection conn, final
byte[] response)
{
final ServerConnection sconn = (ServerConnection) conn;
@@ -143,10 +188,12 @@ public class ServerConnectionDelegate ex
{
tuneAuthorizedConnection(sconn);
sconn.setAuthorizedSubject(authResult.getSubject());
+ _state = ConnectionState.AWAIT_TUNE_OK;
}
else if (AuthenticationStatus.CONTINUE.equals(authResult.getStatus()))
{
connectionAuthContinue(sconn, authResult.getChallenge());
+ _state = ConnectionState.AWAIT_SECURE_OK;
}
else
{
@@ -166,7 +213,7 @@ public class ServerConnectionDelegate ex
public void connectionOpen(Connection conn, ConnectionOpen open)
{
final ServerConnection sconn = (ServerConnection) conn;
-
+ assertState(sconn, ConnectionState.AWAIT_OPEN);
VirtualHost<?> vhost;
String vhostName;
if(open.hasVirtualHost())
@@ -219,6 +266,7 @@ public class ServerConnectionDelegate ex
}
sconn.setState(Connection.State.OPEN);
+ _state = ConnectionState.OPEN;
sconn.invoke(new ConnectionOpenOk(Collections.emptyList()));
}
else
@@ -234,6 +282,7 @@ public class ServerConnectionDelegate ex
public void connectionTuneOk(final Connection conn, final ConnectionTuneOk
ok)
{
ServerConnection sconn = (ServerConnection) conn;
+ assertState(sconn, ConnectionState.AWAIT_TUNE_OK);
int okChannelMax = ok.getChannelMax();
int okMaxFrameSize = ok.getMaxFrameSize();
@@ -295,6 +344,7 @@ public class ServerConnectionDelegate ex
setConnectionTuneOkChannelMax(sconn, okChannelMax);
conn.setMaxFrameSize(okMaxFrameSize);
+ _state = ConnectionState.AWAIT_OPEN;
}
@Override
@@ -339,6 +389,8 @@ public class ServerConnectionDelegate ex
@Override
public void sessionAttach(final Connection conn, final SessionAttach atc)
{
+ assertState((ServerConnection)conn, ConnectionState.OPEN);
+
final Session ssn;
if(isSessionNameUnique(atc.getName(), conn))
@@ -380,6 +432,7 @@ public class ServerConnectionDelegate ex
@Override
public void connectionStartOk(Connection conn, ConnectionStartOk ok)
{
+ assertState((ServerConnection)conn, ConnectionState.AWAIT_START_OK);
_clientProperties = ok.getClientProperties();
if(_clientProperties != null)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]