Author: trustin
Date: Tue Mar 11 07:03:26 2008
New Revision: 635930

URL: http://svn.apache.org/viewvc?rev=635930&view=rev
Log:
Resolved issue: DIRMINA-547 - Reduce unnecessary thread creation and 
destruction caused by IdleStatusChecker
* IdleStatusChecker is not a singleton anymore
* One AbstractIoService has one IdleStatusChecker thread
* An IdleStatusChecker thread starts automatically when a new service is 
created, and stops when the service is disposed.


Modified:
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
    mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java
    
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java?rev=635930&r1=635929&r2=635930&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java 
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java 
Tue Mar 11 07:03:26 2008
@@ -26,6 +26,7 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -39,7 +40,9 @@
  * @version $Rev$, $Date$
  */
 public abstract class AbstractIoService implements IoService {
-    private static final IoServiceListener SERVICE_ACTIVATION_LISTENER =
+    private static final AtomicInteger id = new AtomicInteger();
+
+    private final IoServiceListener serviceActivationListener =
         new IoServiceListener() {
             public void serviceActivated(IoService service) {
                 // Update lastIoTime.
@@ -49,12 +52,11 @@
                 s.lastThroughputCalculationTime = s.getActivationTime();
                 
                 // Start idleness notification.
-                IdleStatusChecker.getInstance().addService(s);
+                idleStatusChecker.addService(s);
             }
 
             public void serviceDeactivated(IoService service) {
-                IdleStatusChecker.getInstance().removeService(
-                        (AbstractIoService) service);
+                idleStatusChecker.removeService((AbstractIoService) service);
             }
 
             public void serviceIdle(IoService service, IdleStatus idleStatus) 
{}
@@ -62,8 +64,6 @@
             public void sessionDestroyed(IoSession session) {}
     };
     
-    private static final AtomicInteger id = new AtomicInteger();
-
     /**
      * Current filter chain builder.
      */
@@ -117,6 +117,7 @@
     private double largestReadMessagesThroughput;
     private double largestWrittenMessagesThroughput;
 
+    private final IdleStatusChecker idleStatusChecker = new 
IdleStatusChecker();
     private final Object idlenessCheckLock = new Object();
     private int idleTimeForRead;
     private int idleTimeForWrite;
@@ -148,7 +149,7 @@
         }
 
         this.listeners = new IoServiceListenerSupport(this);
-        this.listeners.add(SERVICE_ACTIVATION_LISTENER);
+        this.listeners.add(serviceActivationListener);
         this.sessionConfig = sessionConfig;
         
         // Make JVM load the exception monitor before some transports
@@ -164,6 +165,8 @@
         }
         
         this.threadName = getClass().getSimpleName() + '-' + 
id.incrementAndGet();
+        
+        executeWorker(idleStatusChecker.getNotifyingTask(), 
"idleStatusChecker");
     }
 
     public final IoFilterChainBuilder getFilterChainBuilder() {
@@ -232,9 +235,20 @@
             }
         }
         
+        idleStatusChecker.getNotifyingTask().cancel();
         if (disposalFuture != null) {
             disposalFuture.awaitUninterruptibly();
         }
+        if (createdExecutor) {
+            ExecutorService e = (ExecutorService) executor;
+            while (!e.isTerminated()) {
+                try {
+                    e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
+                } catch (InterruptedException e1) {
+                    // Ignore; it should end shortly.
+                }
+            }
+        }
 
         disposed = true;
     }
@@ -726,6 +740,10 @@
     
     protected final IoServiceListenerSupport getListeners() {
         return listeners;
+    }
+    
+    protected final IdleStatusChecker getIdleStatusChecker() {
+        return idleStatusChecker;
     }
     
     protected final void executeWorker(Runnable worker) {

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java?rev=635930&r1=635929&r2=635930&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java 
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java 
Tue Mar 11 07:03:26 2008
@@ -21,12 +21,8 @@
 
 import java.util.Iterator;
 import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.mina.util.ConcurrentHashSet;
-import org.apache.mina.util.NamePreservingRunnable;
 
 /**
  * Detects idle sessions and fires <tt>sessionIdle</tt> events to them.
@@ -35,97 +31,75 @@
  * @version $Rev: 525369 $, $Date: 2007-04-04 05:05:11 +0200 (mer., 04 avr. 
2007) $
  */
 public class IdleStatusChecker {
-    private static final IdleStatusChecker INSTANCE = new IdleStatusChecker();
-
-    public static IdleStatusChecker getInstance() {
-        return INSTANCE;
-    }
-
     private final Set<AbstractIoSession> sessions =
         new ConcurrentHashSet<AbstractIoSession>();
     private final Set<AbstractIoService> services =
         new ConcurrentHashSet<AbstractIoService>();
 
-    private final Object lock = new Object();
-    private final Runnable notifyingTask = new NamePreservingRunnable(
-            new NotifyingTask(), "IdleStatusChecker");
+    private final NotifyingTask notifyingTask = new NotifyingTaskImpl();
     private final IoFutureListener<IoFuture> sessionCloseListener =
         new SessionCloseListener();
-    private volatile ScheduledExecutorService executor;
 
-    private IdleStatusChecker() {}
+    public IdleStatusChecker() {}
 
     public void addSession(AbstractIoSession session) {
-        synchronized (lock) {
-            boolean start = false;
-            if (sessions.isEmpty() && services.isEmpty()) {
-                start = true;
-            }
-            if (!sessions.add(session)) {
-                return;
-            }
-            if (start) {
-                start();
-            }
-        }
-        
+        sessions.add(session);
         session.getCloseFuture().addListener(sessionCloseListener);
     }
     
     public void addService(AbstractIoService service) {
-        synchronized (lock) {
-            boolean start = false;
-            if (sessions.isEmpty() && services.isEmpty()) {
-                start = true;
-            }
-            if (!services.add(service)) {
-                return;
-            }
-            if (start) {
-                start();
-            }
-        }
+        services.add(service);
     }
 
     public void removeSession(AbstractIoSession session) {
-        synchronized (lock) {
-            sessions.remove(session);
-            if (sessions.isEmpty() && services.isEmpty()) {
-                stop();
-            }
-        }
+        sessions.remove(session);
     }
     
     public void removeService(AbstractIoService service) {
-        synchronized (lock) {
-            services.remove(service);
-            if (sessions.isEmpty() && services.isEmpty()) {
-                stop();
-            }
-        }
+        services.remove(service);
     }
-
-    private void start() {
-        ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(1);
-        this.executor = executor;
-        executor.scheduleWithFixedDelay(
-                notifyingTask, 1000, 1000, TimeUnit.MILLISECONDS);
+    
+    public NotifyingTask getNotifyingTask() {
+        return notifyingTask;
     }
     
-    private void stop() {
-        ScheduledExecutorService executor = this.executor;
-        if (executor == null) {
-            return;
-        }
-        executor.shutdownNow();
-        this.executor = null;
+    public interface NotifyingTask extends Runnable {
+        void cancel();
     }
 
-    private class NotifyingTask implements Runnable {
+    private class NotifyingTaskImpl implements NotifyingTask {
+        private volatile boolean cancelled;
+        private volatile Thread thread;
+        
         public void run() {
-            long currentTime = System.currentTimeMillis();
-            notifyServices(currentTime);
-            notifySessions(currentTime);
+            cancelled = false;
+            thread = Thread.currentThread();
+            try {
+                while (!cancelled) {
+                    // Check idleness with fixed delay (1 second).
+                    long currentTime = System.currentTimeMillis();
+                    notifyServices(currentTime);
+                    notifySessions(currentTime);
+                    
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e) {
+                        // will exit the loop if interrupted from interrupt()
+                    }
+                }
+            } finally {
+                thread = null;
+            }
+        }
+        
+        public void cancel() {
+            Thread thread = this.thread;
+            if (thread == null) {
+                return;
+            }
+            
+            cancelled = true;
+            thread.interrupt();
         }
 
         private void notifyServices(long currentTime) {

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java?rev=635930&r1=635929&r2=635930&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
 Tue Mar 11 07:03:26 2008
@@ -29,7 +29,6 @@
 import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.DefaultConnectFuture;
 import org.apache.mina.common.ExceptionMonitor;
-import org.apache.mina.common.IdleStatusChecker;
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoFuture;
 import org.apache.mina.common.IoFutureListener;
@@ -104,7 +103,7 @@
 
             // The following sentences don't throw any exceptions.
             getListeners().fireSessionCreated(localSession);
-            IdleStatusChecker.getInstance().addSession(localSession);
+            getIdleStatusChecker().addSession(localSession);
         } catch (Throwable t) {
             future.setException(t);
             return future;
@@ -120,7 +119,7 @@
 
             // The following sentences don't throw any exceptions.
             entry.getListeners().fireSessionCreated(remoteSession);
-            IdleStatusChecker.getInstance().addSession(remoteSession);
+            getIdleStatusChecker().addSession(remoteSession);
         } catch (Throwable t) {
             ExceptionMonitor.getInstance().exceptionCaught(t);
             remoteSession.close();


Reply via email to