Author: trustin
Date: Thu Nov  8 02:49:29 2007
New Revision: 593125

URL: http://svn.apache.org/viewvc?rev=593125&view=rev
Log:
Fixed a problem that ReadThrottleFilter doesn't resume a session
* resumeOthers() revives suspended sessions periodically even if 
messageReceived event is not fired for the suspended sessions.

Modified:
    
mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java?rev=593125&r1=593124&r2=593125&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java
 Thu Nov  8 02:49:29 2007
@@ -34,6 +34,7 @@
 import org.apache.mina.filter.executor.AbstractExecutorFilter;
 import org.apache.mina.filter.executor.ExecutorFilter;
 import org.apache.mina.util.CopyOnWriteMap;
+import org.slf4j.Logger;
 
 /**
  * An [EMAIL PROTECTED] IoFilter} that throttles incoming traffic to
@@ -64,6 +65,9 @@
     private static final Map<IoService, AtomicInteger> serviceBufferSizes =
         new CopyOnWriteMap<IoService, AtomicInteger>();
     
+    private static final Object globalResumeLock = new Object();
+    private static long lastGlobalResumeTime = 0;
+    
     /**
      * Returns the current amount of data in the buffer of the [EMAIL 
PROTECTED] ExecuorFilter}
      * for all [EMAIL PROTECTED] IoSession} whose [EMAIL PROTECTED] 
IoFilterChain} has been configured by
@@ -104,9 +108,6 @@
     private final AttributeKey STATE =
         new AttributeKey(ReadThrottleFilter.class, "state");
 
-    private final Object logLock = new Object();
-    private long lastLogTime = 0;
-    
     private volatile ReadThrottlePolicy policy;
     private final MessageSizeEstimator messageSizeEstimator;
     
@@ -365,7 +366,8 @@
 
     private void enter(IoSession session, int size) {
         State state = getState(session);
-        
+        Logger logger = IoSessionLogger.getLogger(session, getClass());
+
         int globalBufferSize = 
ReadThrottleFilter.globalBufferSize.addAndGet(size);
         int serviceBufferSize = 
increaseServiceBufferSize(session.getService(), size);
 
@@ -376,8 +378,9 @@
         ReadThrottlePolicy policy = getPolicy();
         
         boolean enforcePolicy = false;
+        int sessionBufferSize;
         synchronized (state) {
-            int sessionBufferSize = (state.sessionBufferSize += size);
+            sessionBufferSize = (state.sessionBufferSize += size);
             if ((maxSessionBufferSize != 0 && sessionBufferSize >= 
maxSessionBufferSize) ||
                 (maxServiceBufferSize != 0 && serviceBufferSize >= 
maxServiceBufferSize) ||
                 (maxGlobalBufferSize  != 0 && globalBufferSize  >= 
maxGlobalBufferSize)) {
@@ -389,32 +392,43 @@
                 }
             }
         }
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Enter: " + sessionBufferSize);
+        }
         
         if (enforcePolicy) {
             switch (policy) {
             case CLOSE:
-                log(session);
+                log(session, state);
                 session.close();
                 raiseException(session);
                 break;
             case EXCEPTION:
-                log(session);
-                session.suspendRead();
+                suspend(session, state, logger);
                 raiseException(session);
                 break;
             case BLOCK:
-                log(session);
-                session.suspendRead();
+                suspend(session, state, logger);
                 break;
             case LOG:
-                log(session);
+                log(session, state);
                 break;
             }
         }
     }
+
+    private void suspend(IoSession session, State state, Logger logger) {
+        log(session, state);
+        session.suspendRead();
+        if (logger.isDebugEnabled()) {
+            logger.debug("Suspended: {}", getMessage(session));
+        }
+    }
     
     private void exit(IoSession session, int size) {
         State state = getState(session);
+        Logger logger = IoSessionLogger.getLogger(session, getClass());
 
         int globalBufferSize = 
ReadThrottleFilter.globalBufferSize.addAndGet(-size);
         if (globalBufferSize < 0) {
@@ -432,35 +446,108 @@
         int maxServiceBufferSize = this.maxServiceBufferSize;
         int maxSessionBufferSize = this.maxSessionBufferSize;
         
+        int sessionBufferSize;
+        
         boolean enforcePolicy = false;
         synchronized (state) {
-            int sessionBufferSize = (state.sessionBufferSize -= size);
+            sessionBufferSize = (state.sessionBufferSize -= size);
             if (sessionBufferSize < 0) {
-                state.sessionBufferSize = 0;
+                state.sessionBufferSize = sessionBufferSize = 0;
                 throw new IllegalStateException("sessionBufferSize < 0");
             }
-    
-            if ((maxSessionBufferSize == 0 || sessionBufferSize < 
maxSessionBufferSize) &&
+            if (state.suspendedRead &&
+                (maxGlobalBufferSize == 0 || globalBufferSize < 
maxGlobalBufferSize) &&
                 (maxServiceBufferSize == 0 || serviceBufferSize < 
maxServiceBufferSize) &&
-                (maxGlobalBufferSize  == 0 || globalBufferSize  < 
maxGlobalBufferSize)) {
+                (maxSessionBufferSize == 0 || sessionBufferSize < 
maxSessionBufferSize)) {
                 state.suspendedRead = false;
                 enforcePolicy = true;
             }
         }
         
+        if (logger.isDebugEnabled()) {
+            logger.debug("Exit: {}", state.sessionBufferSize);
+        }
+        
         if (enforcePolicy) {
             session.resumeRead();
+            if (logger.isDebugEnabled()) {
+                logger.debug("Resumed");
+            }
         }
+        
+        resumeOthers();
     }
+    
+    private void resumeOthers() {
+        long currentTime = System.currentTimeMillis();
+        
+        // Try to resume other sessions every other second.
+        boolean resumeOthers;
+        synchronized (globalResumeLock) {
+            if (currentTime - lastGlobalResumeTime > 1000) {
+                lastGlobalResumeTime = currentTime;
+                resumeOthers = true;
+            } else {
+                resumeOthers = false;
+            }
+        }
+        
+        if (resumeOthers) {
+            int maxGlobalBufferSize = this.maxGlobalBufferSize;
+            if (maxGlobalBufferSize == 0 || globalBufferSize.get() < 
maxGlobalBufferSize) {
+                for (IoService service: serviceBufferSizes.keySet()) {
+                    resumeService(service);
+                }
+            }
+            
+            synchronized (globalResumeLock) {
+                lastGlobalResumeTime = System.currentTimeMillis();
+            }
+        }
+    }
+    
+    private void resumeService(IoService service) {
+        int maxServiceBufferSize = this.maxServiceBufferSize;
+        if (maxServiceBufferSize == 0 || getServiceBufferSize(service) < 
maxServiceBufferSize) {
+            for (IoSession session: service.getManagedSessions()) {
+                resume(session);
+            }
+        }
+    }
+    
+    private void resume(IoSession session) {
+        State state = (State) session.getAttribute(STATE);
+        if (state == null) {
+            return;
+        }
+        
+        int maxSessionBufferSize = this.maxSessionBufferSize;
+        boolean resume = false;
+        synchronized (state) {
+            if (state.suspendedRead &&
+                (maxSessionBufferSize == 0 || state.sessionBufferSize < 
maxSessionBufferSize)) {
+                state.suspendedRead = false;
+                resume = true;
+            }
+        }
 
-    private void log(IoSession session) {
+        if (resume) {
+            session.resumeRead();
+            Logger logger = IoSessionLogger.getLogger(session, getClass());
+            if (logger.isDebugEnabled()) {
+                logger.debug("Resumed");
+            }
+        }
+    }
+
+    private void log(IoSession session, State state) {
         long currentTime = System.currentTimeMillis();
         
         // Prevent log flood by logging every 3 seconds.
         boolean log;
-        synchronized (logLock) {
-            if (currentTime - lastLogTime > 3000) {
-                lastLogTime = currentTime;
+        synchronized (state.logLock) {
+            if (currentTime - state.lastLogTime > 3000) {
+                state.lastLogTime = currentTime;
                 log = true;
             } else {
                 log = false;
@@ -538,5 +625,8 @@
     private static class State {
         private int sessionBufferSize;
         private boolean suspendedRead;
+
+        private final Object logLock = new Object();
+        private long lastLogTime = 0;
     }
 }


Reply via email to