Author: aidan
Date: Wed Sep 16 10:07:44 2009
New Revision: 815705

URL: http://svn.apache.org/viewvc?rev=815705&view=rev
Log:
QPID-2106: Don't close connections if the broker has asked it to close and
there's still stuff to process. Let the cleanup thread do that so that publishes
which are denied don't result in instant connection death.


Modified:
    
qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
    
qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
    
qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
    
qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java

Modified: 
qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java?rev=815705&r1=815704&r2=815705&view=diff
==============================================================================
--- 
qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
 (original)
+++ 
qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
 Wed Sep 16 10:07:44 2009
@@ -44,6 +44,14 @@
     {
 
     }
+    
+    public void expireClosedChannels()
+    {
+        for (AMQProtocolSession connection : _registry)
+        {
+            connection.closeIfLingeringClosedChannels();
+        }
+    }
 
     /** Close all of the currently open connections. */
     public void close() throws AMQException

Modified: 
qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=815705&r1=815704&r2=815705&view=diff
==============================================================================
--- 
qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
 (original)
+++ 
qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
 Wed Sep 16 10:07:44 2009
@@ -29,6 +29,8 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicLong;
@@ -135,7 +137,7 @@
     private FieldTable _clientProperties;
     private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
 
-    private List<Integer> _closingChannelsList = new 
CopyOnWriteArrayList<Integer>();
+    private Map<Integer, Long> _closingChannelsList = new 
ConcurrentHashMap<Integer, Long>();
     private ProtocolOutputConverter _protocolOutputConverter;
     private Principal _authorizedID;
     private MethodDispatcher _dispatcher;
@@ -293,12 +295,8 @@
                 }
                 else
                 {
-                    if (_logger.isInfoEnabled())
-                    {
-                        _logger.info("Channel[" + channelId + "] awaiting 
closure. Should close socket as client did not close-ok :" + frame);
-                    }
-
-                    closeProtocolSession();
+                    // The channel has been told to close, we don't process 
any more frames until
+                    // it's closed. 
                     return;
                 }
             }
@@ -513,7 +511,7 @@
 
     public boolean channelAwaitingClosure(int channelId)
     {
-        return !_closingChannelsList.isEmpty() && 
_closingChannelsList.contains(channelId);
+        return !_closingChannelsList.isEmpty() && 
_closingChannelsList.containsKey(channelId);
     }
 
     public void addChannel(AMQChannel channel) throws AMQException
@@ -525,7 +523,7 @@
 
         final int channelId = channel.getChannelId();
 
-        if (_closingChannelsList.contains(channelId))
+        if (_closingChannelsList.containsKey(channelId))
         {
             throw new AMQException("Session is marked awaiting channel close");
         }
@@ -632,7 +630,7 @@
 
     private void markChannelAwaitingCloseOk(int channelId)
     {
-        _closingChannelsList.add(channelId);
+        _closingChannelsList.put(channelId, System.currentTimeMillis());
     }
 
     /**
@@ -1023,7 +1021,19 @@
     {
         return (_clientVersion == null) ? null : _clientVersion.toString();
     }
-    
-    
+
+    @Override
+    public void closeIfLingeringClosedChannels()
+    {
+        for (Entry<Integer, Long>id : _closingChannelsList.entrySet())
+        {
+            if (id.getValue() + 30000 > System.currentTimeMillis())
+            {
+                // We have a channel that we closed 30 seconds ago. Client's 
dead, kill the connection
+                _logger.error("Closing connection as channel was closed more 
than 30 seconds ago and no ChannelCloseOk has been processed");
+                closeProtocolSession();
+            }
+        }
+    }
     
 }

Modified: 
qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=815705&r1=815704&r2=815705&view=diff
==============================================================================
--- 
qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
 (original)
+++ 
qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
 Wed Sep 16 10:07:44 2009
@@ -225,5 +225,7 @@
     void commitTransactions(AMQChannel channel) throws AMQException;
 
     List<AMQChannel> getChannels();
+
+    void closeIfLingeringClosedChannels();
     
 }

Modified: 
qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=815705&r1=815704&r2=815705&view=diff
==============================================================================
--- 
qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
 (original)
+++ 
qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
 Wed Sep 16 10:07:44 2009
@@ -267,6 +267,14 @@
             _houseKeepingTimer.scheduleAtFixedRate(new 
RemoveExpiredMessagesTask(),
                                                    period / 2,
                                                    period);
+            
+            class ForceChannelClosuresTask extends TimerTask
+            {
+                public void run()
+                {
+                    _connectionRegistry.expireClosedChannels();
+                }
+            }
         }
     }
     



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to