Author: trustin
Date: Thu Nov 8 22:18:21 2007
New Revision: 593425
URL: http://svn.apache.org/viewvc?rev=593425&view=rev
Log:
* Added IoEventQueueThrottle
* Added UnorderedThreadPoolExecutor that knows how to interact with
IoEventQueueHandler
* Added IoEventSizeEstimator and its default implementation.
Added:
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/DefaultIoEventSizeEstimator.java
(with props)
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueThrottle.java
(with props)
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventSizeEstimator.java
(with props)
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java
(with props)
Modified:
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
Added:
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/DefaultIoEventSizeEstimator.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/DefaultIoEventSizeEstimator.java?rev=593425&view=auto
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/DefaultIoEventSizeEstimator.java
(added)
+++
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/DefaultIoEventSizeEstimator.java
Thu Nov 8 22:18:21 2007
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.executor;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoEvent;
+
+/**
+ * A default [EMAIL PROTECTED] IoEventSizeEstimator} implementation.
+ * <p>
+ * <a href="http://martin.nobilitas.com/java/sizeof.html">Martin's Java
Notes</a>
+ * was used for estimation. For unknown types, it inspects declaring fields
of the
+ * class of the specified event and the parameter of the event. The size of
unknown
+ * declaring fields are approximated to the specified
<tt>averageSizePerField</tt>
+ * (default: 64).
+ * <p>
+ * All the estimated sizes of classes are cached for performance improvement.
+ *
+ * @author The Apache MINA Project ([EMAIL PROTECTED])
+ * @version $Rev$, $Date$
+ */
+public class DefaultIoEventSizeEstimator implements IoEventSizeEstimator {
+
+ private final Map<Class<?>, Integer> class2size = new
ConcurrentHashMap<Class<?>, Integer>();
+ private final int averageSizePerField;
+
+ public DefaultIoEventSizeEstimator() {
+ this(64);
+ }
+
+ public DefaultIoEventSizeEstimator(int averageSizePerField) {
+ if (averageSizePerField <= 0) {
+ throw new IllegalArgumentException("averageSizePerField: " +
averageSizePerField);
+ }
+ this.averageSizePerField = averageSizePerField;
+
+ class2size.put(boolean.class, 4); // Probably an integer.
+ class2size.put(byte.class, 1);
+ class2size.put(char.class, 2);
+ class2size.put(int.class, 4);
+ class2size.put(long.class, 8);
+ class2size.put(float.class, 4);
+ class2size.put(double.class, 8);
+ }
+
+ public int estimateSize(IoEvent event) {
+ return estimateSize((Object) event) +
estimateSize(event.getParameter());
+ }
+
+ private int estimateSize(Object message) {
+ if (message == null) {
+ return 8;
+ }
+
+ if (message instanceof IoBuffer) {
+ return align(46 + ((IoBuffer) message).remaining());
+ }
+
+ if (message instanceof CharSequence) {
+ return align(38 + (((CharSequence) message).length() << 1));
+ }
+
+ if (message instanceof Iterable) {
+ int answer = estimateSize(message.getClass());
+ for (Object m: (Iterable<?>) message) {
+ answer += estimateSize(m);
+ }
+ return answer;
+ }
+
+ return estimateSize(message.getClass());
+ }
+
+ private int estimateSize(Class<?> clazz) {
+ Integer objectSize = class2size.get(clazz);
+ if (objectSize != null) {
+ return objectSize;
+ }
+
+ int answer = 8; // Basic overhead.
+ synchronized (class2size) {
+
+ // Get the rough estimation.
+ for (Class<?> c = clazz; c != null; c = c.getSuperclass()) {
+ Field[] fields = c.getDeclaredFields();
+ for (Field f: fields) {
+ if ((f.getModifiers() & Modifier.STATIC) != 0) {
+ // Ignore static fields.
+ continue;
+ }
+
+ Integer fieldSize = class2size.get(f.getType());
+ if (fieldSize == null) {
+ answer += averageSizePerField;
+ } else {
+ answer += fieldSize;
+ }
+ }
+ }
+
+ // Put the intermediate answer to prevent infinite recursion.
+ class2size.put(clazz, answer);
+
+ // Now include field classes, too.
+ for (Class<?> c = clazz; c != null; c = c.getSuperclass()) {
+ Field[] fields = c.getDeclaredFields();
+ for (Field f: fields) {
+ if ((f.getModifiers() & Modifier.STATIC) != 0) {
+ // Ignore static fields.
+ continue;
+ }
+
+ if (!class2size.containsKey(f.getType())) {
+ // Compensate previous rough estimation
+ answer += estimateSize(f.getType()) -
averageSizePerField;
+ }
+ }
+ }
+
+ if (answer <= 0) {
+ answer = averageSizePerField;
+ }
+
+ // Some alignment.
+ answer = align(answer);
+
+ // Put the final answer.
+ class2size.put(clazz, answer);
+ }
+
+ return answer;
+ }
+
+ private static int align(int size) {
+ if (size % 8 != 0) {
+ size /= 8;
+ size ++;
+ size *= 8;
+ }
+ return size;
+ }
+}
Propchange:
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/DefaultIoEventSizeEstimator.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/DefaultIoEventSizeEstimator.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java?rev=593425&r1=593424&r2=593425&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java
Thu Nov 8 22:18:21 2007
@@ -20,11 +20,12 @@
package org.apache.mina.filter.executor;
import java.util.EventListener;
+import java.util.concurrent.ThreadPoolExecutor;
import org.apache.mina.common.IoEvent;
/**
- * Listens to all event queue operations occurring in [EMAIL PROTECTED]
OrderedThreadPoolExecutor}.
+ * Listens to all event queue operations occurring in [EMAIL PROTECTED]
IoThreadPoolExecutor}.
*
* @author The Apache MINA Project ([EMAIL PROTECTED])
* @version $Rev$, $Date$
@@ -35,17 +36,17 @@
* allowed to be offered to the event queue. The <tt>event</tt> is dropped
* if <tt>false</tt> is returned.
*/
- boolean accept(OrderedThreadPoolExecutor executor, IoEvent event);
+ boolean accept(ThreadPoolExecutor executor, IoEvent event);
/**
* Invoked after the specified <tt>event</tt> has been offered to the
* event queue.
*/
- void offered(OrderedThreadPoolExecutor executor, IoEvent event);
+ void offered(ThreadPoolExecutor executor, IoEvent event);
/**
* Invoked after the specified <tt>event</tt> has been polled from the
* event queue.
*/
- void polled(OrderedThreadPoolExecutor executor, IoEvent event);
+ void polled(ThreadPoolExecutor executor, IoEvent event);
}
Added:
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueThrottle.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueThrottle.java?rev=593425&view=auto
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueThrottle.java
(added)
+++
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueThrottle.java
Thu Nov 8 22:18:21 2007
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.executor;
+
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.mina.common.IoEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Throttles incoming events into [EMAIL PROTECTED] OrderedThreadPoolExecutor}.
+ *
+ * @author The Apache MINA Project ([EMAIL PROTECTED])
+ * @version $Rev$, $Date$
+ */
+public class IoEventQueueThrottle implements IoEventQueueHandler {
+
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+
+ private final IoEventSizeEstimator eventSizeEstimator;
+ private volatile int threshold;
+
+ private final Object lock = new Object();
+ private final AtomicInteger counter = new AtomicInteger();
+ private int waiters;
+
+ public IoEventQueueThrottle() {
+ this(new DefaultIoEventSizeEstimator(), 65536);
+ }
+
+ public IoEventQueueThrottle(int threshold) {
+ this(new DefaultIoEventSizeEstimator(), threshold);
+ }
+
+ public IoEventQueueThrottle(IoEventSizeEstimator eventSizeEstimator, int
threshold) {
+ if (eventSizeEstimator == null) {
+ throw new NullPointerException("eventSizeEstimator");
+ }
+ this.eventSizeEstimator = eventSizeEstimator;
+
+ setThreshold(threshold);
+ }
+
+ public IoEventSizeEstimator getEventSizeEstimator() {
+ return eventSizeEstimator;
+ }
+
+ public int getThreshold() {
+ return threshold;
+ }
+
+ public void setThreshold(int threshold) {
+ if (threshold <= 0) {
+ throw new IllegalArgumentException("threshold: " + threshold);
+ }
+ this.threshold = threshold;
+ }
+
+ public boolean accept(ThreadPoolExecutor executor, IoEvent event) {
+ return true;
+ }
+
+ public void offered(ThreadPoolExecutor executor, IoEvent event) {
+ int eventSize = getEventSizeEstimator().estimateSize(event);
+ int currentCounter = counter.addAndGet(eventSize);
+ logState();
+
+ if (currentCounter >= threshold) {
+ block();
+ }
+ }
+
+ public void polled(ThreadPoolExecutor executor, IoEvent event) {
+ int eventSize = getEventSizeEstimator().estimateSize(event);
+ int currentCounter = counter.addAndGet(-eventSize);
+ logState();
+
+ if (currentCounter < threshold) {
+ unblock();
+ }
+ }
+
+ private void logState() {
+ if (logger.isDebugEnabled()) {
+ logger.debug(Thread.currentThread().getName() + " state: " +
counter.get() + " / " + getThreshold());
+ }
+ }
+
+ protected void block() {
+ if (logger.isDebugEnabled()) {
+ logger.debug(Thread.currentThread().getName() + " blocked: " +
counter.get() + " >= " + threshold);
+ }
+
+ synchronized (lock) {
+ while (counter.get() >= threshold) {
+ waiters ++;
+ try {
+ lock.wait();
+ } catch (InterruptedException e) {
+ // Wait uninterruptably.
+ } finally {
+ waiters --;
+ }
+ }
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug(Thread.currentThread().getName() + " unblocked: " +
counter.get() + " < " + threshold);
+ }
+ }
+
+ protected void unblock() {
+ synchronized (lock) {
+ if (waiters > 0) {
+ lock.notify();
+ }
+ }
+ }
+}
Propchange:
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueThrottle.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueThrottle.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventSizeEstimator.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventSizeEstimator.java?rev=593425&view=auto
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventSizeEstimator.java
(added)
+++
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventSizeEstimator.java
Thu Nov 8 22:18:21 2007
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.executor;
+
+import org.apache.mina.common.IoEvent;
+
+/**
+ * Estimates the amount of memory that the specified [EMAIL PROTECTED]
IoEvent} occupies
+ * in the current JVM.
+ *
+ * @author The Apache MINA Project ([EMAIL PROTECTED])
+ * @version $Rev$, $Date$
+ */
+public interface IoEventSizeEstimator {
+ int estimateSize(IoEvent event);
+}
Propchange:
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventSizeEstimator.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventSizeEstimator.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java?rev=593425&r1=593424&r2=593425&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
Thu Nov 8 22:18:21 2007
@@ -50,11 +50,11 @@
private static final IoSession EXIT_SIGNAL = new DummySession();
private static final IoEventQueueHandler NOOP_QUEUE_MONITOR = new
IoEventQueueHandler() {
- public boolean accept(OrderedThreadPoolExecutor executor, IoEvent
event) {
+ public boolean accept(ThreadPoolExecutor executor, IoEvent event) {
return true;
}
- public void offered(OrderedThreadPoolExecutor executor, IoEvent event)
{}
- public void polled(OrderedThreadPoolExecutor executor, IoEvent event)
{}
+ public void offered(ThreadPoolExecutor executor, IoEvent event) {}
+ public void polled(ThreadPoolExecutor executor, IoEvent event) {}
};
private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer");
@@ -72,6 +72,10 @@
private volatile IoEventQueueHandler queueHandler;
+ public OrderedThreadPoolExecutor() {
+ this(16);
+ }
+
public OrderedThreadPoolExecutor(int maximumPoolSize) {
this(0, maximumPoolSize);
}
@@ -276,27 +280,32 @@
IoSession s = e.getSession();
SessionBuffer buf = getSessionBuffer(s);
Queue<Runnable> queue = buf.queue;
- boolean offer;
+ boolean offerSession;
+ boolean offeredEvent;
synchronized (queue) {
- if (queueHandler.accept(this, e)) {
+ offeredEvent = queueHandler.accept(this, e);
+ if (offeredEvent) {
queue.offer(e);
- queueHandler.offered(this, e);
if (buf.processingCompleted) {
buf.processingCompleted = false;
- offer = true;
+ offerSession = true;
} else {
- offer = false;
+ offerSession = false;
}
} else {
- offer = false;
+ offerSession = false;
}
}
- if (offer) {
+ if (offerSession) {
waitingSessions.offer(s);
}
addWorkerIfNecessary();
+
+ if (offeredEvent) {
+ queueHandler.offered(this, e);
+ }
}
private void rejectTask(Runnable task) {
@@ -453,7 +462,7 @@
if (session == null) {
synchronized (workers) {
- if (workers.size() >= corePoolSize) {
+ if (workers.size() > corePoolSize) {
// Remove now to prevent duplicate exit.
workers.remove(this);
break;
Added:
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java?rev=593425&view=auto
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java
(added)
+++
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java
Thu Nov 8 22:18:21 2007
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.executor;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.mina.common.IoEvent;
+
+/**
+ * A [EMAIL PROTECTED] ThreadPoolExecutor} that does not maintain the order of
[EMAIL PROTECTED] IoEvent}s.
+ * This means more than one event handler methods can be invoked at the same
+ * time with mixed order. For example, let's assume that messageReceived,
messageSent,
+ * and sessionClosed events are fired.
+ * <ul>
+ * <li>All event handler methods can be called simultaneously.
+ * (e.g. messageReceived and messageSent can be invoked at the same
time.)</li>
+ * <li>The event order can be mixed up.
+ * (e.g. sessionClosed or messageSent can be invoked before messageReceived
+ * is invoked.)</li>
+ * </ul>
+ * If you need to maintain the order of events per session, please use
+ * [EMAIL PROTECTED] OrderedThreadPoolExecutor}.
+ *
+ * @author The Apache MINA Project ([EMAIL PROTECTED])
+ * @version $Rev$, $Date$
+ */
+public class UnorderedThreadPoolExecutor extends ThreadPoolExecutor {
+
+ private static final Runnable EXIT_SIGNAL = new Runnable() {
+ public void run() {}
+ };
+ private static final IoEventQueueHandler NOOP_QUEUE_MONITOR = new
IoEventQueueHandler() {
+ public boolean accept(ThreadPoolExecutor executor, IoEvent event) {
+ return true;
+ }
+ public void offered(ThreadPoolExecutor executor, IoEvent event) {}
+ public void polled(ThreadPoolExecutor executor, IoEvent event) {}
+ };
+
+ private final Set<Worker> workers = new HashSet<Worker>();
+
+ private volatile int corePoolSize;
+ private volatile int maximumPoolSize;
+ private volatile int largestPoolSize;
+ private final AtomicInteger idleWorkers = new AtomicInteger();
+
+ private long completedTaskCount;
+ private volatile boolean shutdown;
+
+ private volatile IoEventQueueHandler queueHandler;
+
+ public UnorderedThreadPoolExecutor() {
+ this(16);
+ }
+
+ public UnorderedThreadPoolExecutor(int maximumPoolSize) {
+ this(0, maximumPoolSize);
+ }
+
+ public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
+ this(corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS);
+ }
+
+ public UnorderedThreadPoolExecutor(
+ int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit) {
+ this(corePoolSize, maximumPoolSize, keepAliveTime, unit,
Executors.defaultThreadFactory());
+ }
+
+ public UnorderedThreadPoolExecutor(
+ int corePoolSize, int maximumPoolSize,
+ long keepAliveTime, TimeUnit unit,
+ IoEventQueueHandler queueMonitor) {
+ this(corePoolSize, maximumPoolSize, keepAliveTime, unit,
Executors.defaultThreadFactory(), queueMonitor);
+ }
+
+ public UnorderedThreadPoolExecutor(
+ int corePoolSize, int maximumPoolSize,
+ long keepAliveTime, TimeUnit unit,
+ ThreadFactory threadFactory) {
+ this(corePoolSize, maximumPoolSize, keepAliveTime, unit,
threadFactory, null);
+ }
+
+ public UnorderedThreadPoolExecutor(
+ int corePoolSize, int maximumPoolSize,
+ long keepAliveTime, TimeUnit unit,
+ ThreadFactory threadFactory, IoEventQueueHandler queueMonitor) {
+ super(0, 1, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(),
threadFactory, new AbortPolicy());
+ if (corePoolSize < 0) {
+ throw new IllegalArgumentException("corePoolSize: " +
corePoolSize);
+ }
+
+ if (maximumPoolSize == 0 || maximumPoolSize < corePoolSize) {
+ throw new IllegalArgumentException("maximumPoolSize: " +
maximumPoolSize);
+ }
+
+ this.corePoolSize = corePoolSize;
+ this.maximumPoolSize = maximumPoolSize;
+ setQueueHandler(queueMonitor);
+ }
+
+ public IoEventQueueHandler getQueueHandler() {
+ return queueHandler;
+ }
+
+ public void setQueueHandler(IoEventQueueHandler queueHandler) {
+ if (queueHandler == null) {
+ queueHandler = NOOP_QUEUE_MONITOR;
+ }
+ this.queueHandler = queueHandler;
+ }
+
+ @Override
+ public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
+ // Ignore the request. It must always be AbortPolicy.
+ }
+
+ private void addWorker() {
+ synchronized (workers) {
+ if (workers.size() >= maximumPoolSize) {
+ return;
+ }
+
+ Worker worker = new Worker();
+ Thread thread = getThreadFactory().newThread(worker);
+ idleWorkers.incrementAndGet();
+ thread.start();
+ workers.add(worker);
+
+ if (workers.size() > largestPoolSize) {
+ largestPoolSize = workers.size();
+ }
+ }
+ }
+
+ private void addWorkerIfNecessary() {
+ if (idleWorkers.get() == 0) {
+ synchronized (workers) {
+ if (workers.isEmpty() || idleWorkers.get() == 0) {
+ addWorker();
+ }
+ }
+ }
+ }
+
+ private void removeWorker() {
+ synchronized (workers) {
+ if (workers.size() <= corePoolSize) {
+ return;
+ }
+ getQueue().offer(EXIT_SIGNAL);
+ }
+ }
+
+ @Override
+ public int getMaximumPoolSize() {
+ return maximumPoolSize;
+ }
+
+ @Override
+ public void setMaximumPoolSize(int maximumPoolSize) {
+ synchronized (workers) {
+ if (maximumPoolSize == 0 || maximumPoolSize < corePoolSize) {
+ throw new IllegalArgumentException("maximumPoolSize: " +
maximumPoolSize);
+ }
+
+ if (this.maximumPoolSize > maximumPoolSize) {
+ for (int i = this.maximumPoolSize - maximumPoolSize; i > 0; i
--) {
+ removeWorker();
+ }
+ }
+ this.maximumPoolSize = maximumPoolSize;
+ }
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit)
+ throws InterruptedException {
+
+ long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
+
+ synchronized (workers) {
+ while (!isTerminated()) {
+ long waitTime = deadline - System.currentTimeMillis();
+ if (waitTime <= 0) {
+ break;
+ }
+
+ workers.wait(waitTime);
+ }
+ }
+ return isTerminated();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return shutdown;
+ }
+
+ @Override
+ public boolean isTerminated() {
+ if (!shutdown) {
+ return false;
+ }
+
+ synchronized (workers) {
+ return workers.isEmpty();
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ if (shutdown) {
+ return;
+ }
+
+ shutdown = true;
+
+ synchronized (workers) {
+ for (int i = workers.size(); i > 0; i --) {
+ getQueue().offer(EXIT_SIGNAL);
+ }
+ }
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ shutdown();
+
+ List<Runnable> answer = new ArrayList<Runnable>();
+ Runnable task;
+ while ((task = getQueue().poll()) != null) {
+ if (task == EXIT_SIGNAL) {
+ getQueue().offer(EXIT_SIGNAL);
+ Thread.yield(); // Let others take the signal.
+ continue;
+ }
+
+ answer.add(task);
+ }
+
+ return answer;
+ }
+
+ @Override
+ public void execute(Runnable task) {
+ if (shutdown) {
+ rejectTask(task);
+ }
+
+ checkTaskType(task);
+
+ IoEvent e = (IoEvent) task;
+ boolean offeredEvent = queueHandler.accept(this, e);
+ if (offeredEvent) {
+ getQueue().offer(e);
+ }
+
+ addWorkerIfNecessary();
+
+ if (offeredEvent) {
+ queueHandler.offered(this, e);
+ }
+ }
+
+ private void rejectTask(Runnable task) {
+ getRejectedExecutionHandler().rejectedExecution(task, this);
+ }
+
+ private void checkTaskType(Runnable task) {
+ if (!(task instanceof IoEvent)) {
+ throw new IllegalArgumentException("task must be an IoEvent or its
subclass.");
+ }
+ }
+
+ @Override
+ public int getActiveCount() {
+ synchronized (workers) {
+ return workers.size() - idleWorkers.get();
+ }
+ }
+
+ @Override
+ public long getCompletedTaskCount() {
+ synchronized (workers) {
+ long answer = completedTaskCount;
+ for (Worker w: workers) {
+ answer += w.completedTaskCount;
+ }
+
+ return answer;
+ }
+ }
+
+ @Override
+ public int getLargestPoolSize() {
+ return largestPoolSize;
+ }
+
+ @Override
+ public int getPoolSize() {
+ synchronized (workers) {
+ return workers.size();
+ }
+ }
+
+ @Override
+ public long getTaskCount() {
+ return getCompletedTaskCount();
+ }
+
+ @Override
+ public boolean isTerminating() {
+ synchronized (workers) {
+ return isShutdown() && !isTerminated();
+ }
+ }
+
+ @Override
+ public int prestartAllCoreThreads() {
+ int answer = 0;
+ synchronized (workers) {
+ for (int i = corePoolSize - workers.size() ; i > 0; i --) {
+ addWorker();
+ answer ++;
+ }
+ }
+ return answer;
+ }
+
+ @Override
+ public boolean prestartCoreThread() {
+ synchronized (workers) {
+ if (workers.size() < corePoolSize) {
+ addWorker();
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ @Override
+ public int getCorePoolSize() {
+ return corePoolSize;
+ }
+
+ @Override
+ public void setCorePoolSize(int corePoolSize) {
+ if (corePoolSize < 0) {
+ throw new IllegalArgumentException("corePoolSize: " +
corePoolSize);
+ }
+
+ synchronized (workers) {
+ if (this.corePoolSize > corePoolSize) {
+ for (int i = this.corePoolSize - corePoolSize; i > 0; i --) {
+ removeWorker();
+ }
+ }
+ this.corePoolSize = corePoolSize;
+ }
+ }
+
+ private class Worker implements Runnable {
+
+ private volatile long completedTaskCount;
+ private Thread thread;
+
+ public void run() {
+ thread = Thread.currentThread();
+
+ for (;;) {
+ Runnable task = fetchTask();
+
+ idleWorkers.decrementAndGet();
+
+ if (task == null) {
+ synchronized (workers) {
+ if (workers.size() > corePoolSize) {
+ // Remove now to prevent duplicate exit.
+ workers.remove(this);
+ break;
+ }
+ }
+ }
+
+ if (task == EXIT_SIGNAL) {
+ break;
+ }
+
+ queueHandler.polled(UnorderedThreadPoolExecutor.this,
(IoEvent) task);
+ try {
+ runTask(task);
+ } finally {
+ idleWorkers.incrementAndGet();
+ }
+ }
+
+ synchronized (workers) {
+ workers.remove(this);
+ UnorderedThreadPoolExecutor.this.completedTaskCount +=
completedTaskCount;
+ workers.notifyAll();
+ }
+ }
+
+ private Runnable fetchTask() {
+ Runnable task = null;
+ long currentTime = System.currentTimeMillis();
+ long deadline = currentTime +
getKeepAliveTime(TimeUnit.MILLISECONDS);
+ for (;;) {
+ try {
+ long waitTime = deadline - currentTime;
+ if (waitTime <= 0) {
+ break;
+ }
+
+ try {
+ task = getQueue().poll(waitTime,
TimeUnit.MILLISECONDS);
+ break;
+ } finally {
+ if (task == null) {
+ currentTime = System.currentTimeMillis();
+ }
+ }
+ } catch (InterruptedException e) {
+ // Ignore.
+ continue;
+ }
+ }
+ return task;
+ }
+
+ private void runTask(Runnable task) {
+ beforeExecute(thread, task);
+ boolean ran = false;
+ try {
+ task.run();
+ ran = true;
+ afterExecute(task, null);
+ completedTaskCount ++;
+ } catch (RuntimeException e) {
+ if (!ran)
+ afterExecute(task, e);
+ throw e;
+ }
+ }
+ }
+}
Propchange:
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java
------------------------------------------------------------------------------
svn:keywords = Rev Date