Author: trustin
Date: Tue Mar 11 07:03:26 2008
New Revision: 635930
URL: http://svn.apache.org/viewvc?rev=635930&view=rev
Log:
Resolved issue: DIRMINA-547 - Reduce unnecessary thread creation and
destruction caused by IdleStatusChecker
* IdleStatusChecker is not a singleton anymore
* One AbstractIoService has one IdleStatusChecker thread
* An IdleStatusChecker thread starts automatically when a new service is
created, and stops when the service is disposed.
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java?rev=635930&r1=635929&r2=635930&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
Tue Mar 11 07:03:26 2008
@@ -26,6 +26,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -39,7 +40,9 @@
* @version $Rev$, $Date$
*/
public abstract class AbstractIoService implements IoService {
- private static final IoServiceListener SERVICE_ACTIVATION_LISTENER =
+ private static final AtomicInteger id = new AtomicInteger();
+
+ private final IoServiceListener serviceActivationListener =
new IoServiceListener() {
public void serviceActivated(IoService service) {
// Update lastIoTime.
@@ -49,12 +52,11 @@
s.lastThroughputCalculationTime = s.getActivationTime();
// Start idleness notification.
- IdleStatusChecker.getInstance().addService(s);
+ idleStatusChecker.addService(s);
}
public void serviceDeactivated(IoService service) {
- IdleStatusChecker.getInstance().removeService(
- (AbstractIoService) service);
+ idleStatusChecker.removeService((AbstractIoService) service);
}
public void serviceIdle(IoService service, IdleStatus idleStatus)
{}
@@ -62,8 +64,6 @@
public void sessionDestroyed(IoSession session) {}
};
- private static final AtomicInteger id = new AtomicInteger();
-
/**
* Current filter chain builder.
*/
@@ -117,6 +117,7 @@
private double largestReadMessagesThroughput;
private double largestWrittenMessagesThroughput;
+ private final IdleStatusChecker idleStatusChecker = new
IdleStatusChecker();
private final Object idlenessCheckLock = new Object();
private int idleTimeForRead;
private int idleTimeForWrite;
@@ -148,7 +149,7 @@
}
this.listeners = new IoServiceListenerSupport(this);
- this.listeners.add(SERVICE_ACTIVATION_LISTENER);
+ this.listeners.add(serviceActivationListener);
this.sessionConfig = sessionConfig;
// Make JVM load the exception monitor before some transports
@@ -164,6 +165,8 @@
}
this.threadName = getClass().getSimpleName() + '-' +
id.incrementAndGet();
+
+ executeWorker(idleStatusChecker.getNotifyingTask(),
"idleStatusChecker");
}
public final IoFilterChainBuilder getFilterChainBuilder() {
@@ -232,9 +235,20 @@
}
}
+ idleStatusChecker.getNotifyingTask().cancel();
if (disposalFuture != null) {
disposalFuture.awaitUninterruptibly();
}
+ if (createdExecutor) {
+ ExecutorService e = (ExecutorService) executor;
+ while (!e.isTerminated()) {
+ try {
+ e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
+ } catch (InterruptedException e1) {
+ // Ignore; it should end shortly.
+ }
+ }
+ }
disposed = true;
}
@@ -726,6 +740,10 @@
protected final IoServiceListenerSupport getListeners() {
return listeners;
+ }
+
+ protected final IdleStatusChecker getIdleStatusChecker() {
+ return idleStatusChecker;
}
protected final void executeWorker(Runnable worker) {
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java?rev=635930&r1=635929&r2=635930&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java
Tue Mar 11 07:03:26 2008
@@ -21,12 +21,8 @@
import java.util.Iterator;
import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import org.apache.mina.util.ConcurrentHashSet;
-import org.apache.mina.util.NamePreservingRunnable;
/**
* Detects idle sessions and fires <tt>sessionIdle</tt> events to them.
@@ -35,97 +31,75 @@
* @version $Rev: 525369 $, $Date: 2007-04-04 05:05:11 +0200 (mer., 04 avr.
2007) $
*/
public class IdleStatusChecker {
- private static final IdleStatusChecker INSTANCE = new IdleStatusChecker();
-
- public static IdleStatusChecker getInstance() {
- return INSTANCE;
- }
-
private final Set<AbstractIoSession> sessions =
new ConcurrentHashSet<AbstractIoSession>();
private final Set<AbstractIoService> services =
new ConcurrentHashSet<AbstractIoService>();
- private final Object lock = new Object();
- private final Runnable notifyingTask = new NamePreservingRunnable(
- new NotifyingTask(), "IdleStatusChecker");
+ private final NotifyingTask notifyingTask = new NotifyingTaskImpl();
private final IoFutureListener<IoFuture> sessionCloseListener =
new SessionCloseListener();
- private volatile ScheduledExecutorService executor;
- private IdleStatusChecker() {}
+ public IdleStatusChecker() {}
public void addSession(AbstractIoSession session) {
- synchronized (lock) {
- boolean start = false;
- if (sessions.isEmpty() && services.isEmpty()) {
- start = true;
- }
- if (!sessions.add(session)) {
- return;
- }
- if (start) {
- start();
- }
- }
-
+ sessions.add(session);
session.getCloseFuture().addListener(sessionCloseListener);
}
public void addService(AbstractIoService service) {
- synchronized (lock) {
- boolean start = false;
- if (sessions.isEmpty() && services.isEmpty()) {
- start = true;
- }
- if (!services.add(service)) {
- return;
- }
- if (start) {
- start();
- }
- }
+ services.add(service);
}
public void removeSession(AbstractIoSession session) {
- synchronized (lock) {
- sessions.remove(session);
- if (sessions.isEmpty() && services.isEmpty()) {
- stop();
- }
- }
+ sessions.remove(session);
}
public void removeService(AbstractIoService service) {
- synchronized (lock) {
- services.remove(service);
- if (sessions.isEmpty() && services.isEmpty()) {
- stop();
- }
- }
+ services.remove(service);
}
-
- private void start() {
- ScheduledExecutorService executor =
Executors.newScheduledThreadPool(1);
- this.executor = executor;
- executor.scheduleWithFixedDelay(
- notifyingTask, 1000, 1000, TimeUnit.MILLISECONDS);
+
+ public NotifyingTask getNotifyingTask() {
+ return notifyingTask;
}
- private void stop() {
- ScheduledExecutorService executor = this.executor;
- if (executor == null) {
- return;
- }
- executor.shutdownNow();
- this.executor = null;
+ public interface NotifyingTask extends Runnable {
+ void cancel();
}
- private class NotifyingTask implements Runnable {
+ private class NotifyingTaskImpl implements NotifyingTask {
+ private volatile boolean cancelled;
+ private volatile Thread thread;
+
public void run() {
- long currentTime = System.currentTimeMillis();
- notifyServices(currentTime);
- notifySessions(currentTime);
+ cancelled = false;
+ thread = Thread.currentThread();
+ try {
+ while (!cancelled) {
+ // Check idleness with fixed delay (1 second).
+ long currentTime = System.currentTimeMillis();
+ notifyServices(currentTime);
+ notifySessions(currentTime);
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // will exit the loop if interrupted from interrupt()
+ }
+ }
+ } finally {
+ thread = null;
+ }
+ }
+
+ public void cancel() {
+ Thread thread = this.thread;
+ if (thread == null) {
+ return;
+ }
+
+ cancelled = true;
+ thread.interrupt();
}
private void notifyServices(long currentTime) {
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java?rev=635930&r1=635929&r2=635930&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
Tue Mar 11 07:03:26 2008
@@ -29,7 +29,6 @@
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.DefaultConnectFuture;
import org.apache.mina.common.ExceptionMonitor;
-import org.apache.mina.common.IdleStatusChecker;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoFuture;
import org.apache.mina.common.IoFutureListener;
@@ -104,7 +103,7 @@
// The following sentences don't throw any exceptions.
getListeners().fireSessionCreated(localSession);
- IdleStatusChecker.getInstance().addSession(localSession);
+ getIdleStatusChecker().addSession(localSession);
} catch (Throwable t) {
future.setException(t);
return future;
@@ -120,7 +119,7 @@
// The following sentences don't throw any exceptions.
entry.getListeners().fireSessionCreated(remoteSession);
- IdleStatusChecker.getInstance().addSession(remoteSession);
+ getIdleStatusChecker().addSession(remoteSession);
} catch (Throwable t) {
ExceptionMonitor.getInstance().exceptionCaught(t);
remoteSession.close();