Author: robbie
Date: Thu Jan 27 11:18:39 2011
New Revision: 1064084
URL: http://svn.apache.org/viewvc?rev=1064084&view=rev
Log:
QPID-3021: set the session/connection actor when the connection recieves new
events, ensure the correct thread logs close
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java?rev=1064084&r1=1064083&r2=1064084&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
Thu Jan 27 11:18:39 2011
@@ -41,7 +41,7 @@ public class ConfigurationFileApplicatio
public void close()
{
//Set the Actor for Broker Shutdown
- CurrentActor.set(new BrokerActor(_registryName, _rootMessageLogger));
+ CurrentActor.set(new BrokerActor(_rootMessageLogger));
try
{
super.close();
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=1064084&r1=1064083&r2=1064084&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
Thu Jan 27 11:18:39 2011
@@ -23,6 +23,7 @@ package org.apache.qpid.server.transport
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.*;
import java.text.MessageFormat;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
@@ -39,11 +40,13 @@ import org.apache.qpid.transport.Connect
import org.apache.qpid.transport.ExecutionErrorCode;
import org.apache.qpid.transport.ExecutionException;
import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.ProtocolEvent;
public class ServerConnection extends Connection implements
AMQConnectionModel, LogSubject
{
private ConnectionConfig _config;
private Runnable _onOpenTask;
+ private AtomicBoolean _logClosed = new AtomicBoolean(false);
private LogActor _actor = GenericActor.getInstance(this);
public ServerConnection()
@@ -73,6 +76,14 @@ public class ServerConnection extends Co
if (state == State.CLOSED)
{
+ logClosed();
+ }
+ }
+
+ protected void logClosed()
+ {
+ if(_logClosed.compareAndSet(false, true))
+ {
CurrentActor.get().message(this, ConnectionMessages.CLOSE());
}
}
@@ -135,13 +146,36 @@ public class ServerConnection extends Co
((ServerSession)session).close();
}
- public String toLogString() {
+ @Override
+ public void received(ProtocolEvent event)
+ {
+ ServerSession channel = (ServerSession) getSession(event.getChannel());
+ LogActor channelActor = null;
+
+ if (channel != null)
+ {
+ channelActor = channel.getLogActor();
+ }
+
+ CurrentActor.set(channelActor == null ? _actor : channelActor);
+ try
+ {
+ super.received(event);
+ }
+ finally
+ {
+ CurrentActor.remove();
+ }
+ }
+
+ public String toLogString()
+ {
boolean hasVirtualHost = (null != this.getVirtualHost());
boolean hasPrincipal = (null != getAuthorizationID());
if (hasPrincipal && hasVirtualHost)
{
- return " [" +
+ return "[" +
MessageFormat.format(CONNECTION_FORMAT,
getConnectionId(),
getClientId(),
@@ -151,7 +185,7 @@ public class ServerConnection extends Co
}
else if (hasPrincipal)
{
- return " [" +
+ return "[" +
MessageFormat.format(USER_FORMAT,
getConnectionId(),
getClientId(),
@@ -161,7 +195,7 @@ public class ServerConnection extends Co
}
else
{
- return " [" +
+ return "[" +
MessageFormat.format(SOCKET_FORMAT,
getConnectionId(),
getConfig().getAddress())
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=1064084&r1=1064083&r2=1064084&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
Thu Jan 27 11:18:39 2011
@@ -84,7 +84,22 @@ public class ServerConnectionDelegate ex
}
- @Override public void connectionOpen(Connection conn, ConnectionOpen open)
+ @Override
+ public void connectionClose(Connection conn, ConnectionClose close)
+ {
+ try
+ {
+ ((ServerConnection) conn).logClosed();
+ }
+ finally
+ {
+ super.connectionClose(conn, close);
+ }
+
+ }
+
+ @Override
+ public void connectionOpen(Connection conn, ConnectionOpen open)
{
ServerConnection sconn = (ServerConnection) conn;
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1064084&r1=1064083&r2=1064084&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
Thu Jan 27 11:18:39 2011
@@ -32,6 +32,7 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.configuration.ConnectionConfig;
import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.configuration.SessionConfigType;
+import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
@@ -57,7 +58,6 @@ import org.apache.qpid.transport.Range;
import org.apache.qpid.transport.RangeSet;
import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.SessionDelegate;
-import org.apache.qpid.transport.Session.State;
import java.lang.ref.WeakReference;
import java.security.Principal;
@@ -81,6 +81,7 @@ public class ServerSession extends Sessi
private final UUID _id;
private ConnectionConfig _connectionConfig;
private long _createTime = System.currentTimeMillis();
+ private LogActor _actor = GenericActor.getInstance(this);
public static interface MessageDispositionChangeListener
{
@@ -130,7 +131,7 @@ public class ServerSession extends Sessi
if (state == State.OPEN)
{
-
GenericActor.getInstance(this).message(ChannelMessages.CREATE());
+ _actor.message(ChannelMessages.CREATE());
}
}
@@ -595,6 +596,11 @@ public class ServerSession extends Sessi
return getConnection().getClientId();
}
+ public LogActor getLogActor()
+ {
+ return _actor;
+ }
+
public LogSubject getLogSubject()
{
return (LogSubject) this;
@@ -603,7 +609,7 @@ public class ServerSession extends Sessi
@Override
public String toLogString()
{
- return " [" +
+ return "[" +
MessageFormat.format(CHANNEL_FORMAT,
getConnection().getConnectionId(),
getClientID(),
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=1064084&r1=1064083&r2=1064084&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
Thu Jan 27 11:18:39 2011
@@ -1223,7 +1223,6 @@ public class ServerSessionDelegate exten
@Override
public void closed(Session session)
{
- super.closed(session);
for(Subscription_0_10 sub : getSubscriptions(session))
{
((ServerSession)session).unregister(sub);
Modified:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1064084&r1=1064083&r2=1064084&view=diff
==============================================================================
---
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
(original)
+++
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
Thu Jan 27 11:18:39 2011
@@ -434,7 +434,7 @@ public class Connection extends Connecti
}
}
- Session getSession(int channel)
+ protected Session getSession(int channel)
{
synchronized (lock)
{
Modified:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java?rev=1064084&r1=1064083&r2=1064084&view=diff
==============================================================================
---
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
(original)
+++
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
Thu Jan 27 11:18:39 2011
@@ -52,7 +52,6 @@ public class IoNetworkTransport implemen
private long timeout = 60000;
private ConnectionSettings settings;
- @Override
public void init(ConnectionSettings settings)
{
try
@@ -84,20 +83,17 @@ public class IoNetworkTransport implemen
}
}
- @Override
public void receiver(Receiver<ByteBuffer> delegate)
{
receiver = new IoReceiver(this, delegate,
2*settings.getReadBufferSize() , timeout);
}
- @Override
public Sender<ByteBuffer> sender()
{
return new IoSender(this, 2*settings.getWriteBufferSize(), timeout);
}
-
- @Override
+
public void close()
{
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]