Author: trustin
Date: Thu Nov  8 22:18:21 2007
New Revision: 593425

URL: http://svn.apache.org/viewvc?rev=593425&view=rev
Log:
* Added IoEventQueueThrottle
* Added UnorderedThreadPoolExecutor that knows how to interact with 
IoEventQueueHandler
* Added IoEventSizeEstimator and its default implementation.

Added:
    
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/DefaultIoEventSizeEstimator.java
   (with props)
    
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueThrottle.java
   (with props)
    
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventSizeEstimator.java
   (with props)
    
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java
   (with props)
Modified:
    
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java
    
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java

Added: 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/DefaultIoEventSizeEstimator.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/DefaultIoEventSizeEstimator.java?rev=593425&view=auto
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/DefaultIoEventSizeEstimator.java
 (added)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/DefaultIoEventSizeEstimator.java
 Thu Nov  8 22:18:21 2007
@@ -0,0 +1,164 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.filter.executor;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoEvent;
+
+/**
+ * A default [EMAIL PROTECTED] IoEventSizeEstimator} implementation.
+ * <p>
+ * <a href="http://martin.nobilitas.com/java/sizeof.html";>Martin's Java 
Notes</a>
+ * was used for estimation.  For unknown types, it inspects declaring fields 
of the
+ * class of the specified event and the parameter of the event.  The size of 
unknown
+ * declaring fields are approximated to the specified 
<tt>averageSizePerField</tt>
+ * (default: 64).
+ * <p>
+ * All the estimated sizes of classes are cached for performance improvement.
+ * 
+ * @author The Apache MINA Project ([EMAIL PROTECTED])
+ * @version $Rev$, $Date$
+ */
+public class DefaultIoEventSizeEstimator implements IoEventSizeEstimator {
+
+    private final Map<Class<?>, Integer> class2size = new 
ConcurrentHashMap<Class<?>, Integer>();
+    private final int averageSizePerField;
+    
+    public DefaultIoEventSizeEstimator() {
+        this(64);
+    }
+    
+    public DefaultIoEventSizeEstimator(int averageSizePerField) {
+        if (averageSizePerField <= 0) {
+            throw new IllegalArgumentException("averageSizePerField: " + 
averageSizePerField);
+        }
+        this.averageSizePerField = averageSizePerField;
+        
+        class2size.put(boolean.class, 4); // Probably an integer.
+        class2size.put(byte.class, 1);
+        class2size.put(char.class, 2);
+        class2size.put(int.class, 4);
+        class2size.put(long.class, 8);
+        class2size.put(float.class, 4);
+        class2size.put(double.class, 8);
+    }
+    
+    public int estimateSize(IoEvent event) {
+        return estimateSize((Object) event) + 
estimateSize(event.getParameter());
+    }
+    
+    private int estimateSize(Object message) {
+        if (message == null) {
+            return 8;
+        }
+
+        if (message instanceof IoBuffer) {
+            return align(46 + ((IoBuffer) message).remaining());
+        }
+        
+        if (message instanceof CharSequence) {
+            return align(38 + (((CharSequence) message).length() << 1));
+        }
+        
+        if (message instanceof Iterable) {
+            int answer = estimateSize(message.getClass());
+            for (Object m: (Iterable<?>) message) {
+                answer += estimateSize(m);
+            }
+            return answer;
+        }
+        
+        return estimateSize(message.getClass());
+    }
+    
+    private int estimateSize(Class<?> clazz) {
+        Integer objectSize = class2size.get(clazz);
+        if (objectSize != null) {
+            return objectSize;
+        }
+        
+        int answer = 8; // Basic overhead.
+        synchronized (class2size) {
+            
+            // Get the rough estimation.
+            for (Class<?> c = clazz; c != null; c = c.getSuperclass()) {
+                Field[] fields = c.getDeclaredFields();
+                for (Field f: fields) {
+                    if ((f.getModifiers() & Modifier.STATIC) != 0) {
+                        // Ignore static fields.
+                        continue;
+                    }
+                    
+                    Integer fieldSize = class2size.get(f.getType());
+                    if (fieldSize == null) {
+                        answer += averageSizePerField;
+                    } else {
+                        answer += fieldSize;
+                    }
+                }
+            }
+            
+            // Put the intermediate answer to prevent infinite recursion.
+            class2size.put(clazz, answer);
+            
+            // Now include field classes, too.
+            for (Class<?> c = clazz; c != null; c = c.getSuperclass()) {
+                Field[] fields = c.getDeclaredFields();
+                for (Field f: fields) {
+                    if ((f.getModifiers() & Modifier.STATIC) != 0) {
+                        // Ignore static fields.
+                        continue;
+                    }
+                    
+                    if (!class2size.containsKey(f.getType())) {
+                        // Compensate previous rough estimation
+                        answer += estimateSize(f.getType()) - 
averageSizePerField;
+                    }
+                }
+            }
+            
+            if (answer <= 0) {
+                answer = averageSizePerField;
+            }
+            
+            // Some alignment.
+            answer = align(answer);
+            
+            // Put the final answer.
+            class2size.put(clazz, answer);
+        }
+        
+        return answer;
+    }
+    
+    private static int align(int size) {
+        if (size % 8 != 0) {
+            size /= 8;
+            size ++;
+            size *= 8;
+        }
+        return size;
+    }
+}

Propchange: 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/DefaultIoEventSizeEstimator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/DefaultIoEventSizeEstimator.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java?rev=593425&r1=593424&r2=593425&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java
 Thu Nov  8 22:18:21 2007
@@ -20,11 +20,12 @@
 package org.apache.mina.filter.executor;
 
 import java.util.EventListener;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.mina.common.IoEvent;
 
 /**
- * Listens to all event queue operations occurring in [EMAIL PROTECTED] 
OrderedThreadPoolExecutor}.
+ * Listens to all event queue operations occurring in [EMAIL PROTECTED] 
IoThreadPoolExecutor}.
  * 
  * @author The Apache MINA Project ([EMAIL PROTECTED])
  * @version $Rev$, $Date$
@@ -35,17 +36,17 @@
      * allowed to be offered to the event queue.  The <tt>event</tt> is dropped
      * if <tt>false</tt> is returned.
      */
-    boolean accept(OrderedThreadPoolExecutor executor, IoEvent event);
+    boolean accept(ThreadPoolExecutor executor, IoEvent event);
     
     /**
      * Invoked after the specified <tt>event</tt> has been offered to the
      * event queue.
      */
-    void offered(OrderedThreadPoolExecutor executor, IoEvent event);
+    void offered(ThreadPoolExecutor executor, IoEvent event);
     
     /**
      * Invoked after the specified <tt>event</tt> has been polled from the
      * event queue.
      */
-    void polled(OrderedThreadPoolExecutor executor, IoEvent event);
+    void polled(ThreadPoolExecutor executor, IoEvent event);
 }

Added: 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueThrottle.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueThrottle.java?rev=593425&view=auto
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueThrottle.java
 (added)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueThrottle.java
 Thu Nov  8 22:18:21 2007
@@ -0,0 +1,138 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.filter.executor;
+
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.mina.common.IoEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Throttles incoming events into [EMAIL PROTECTED] OrderedThreadPoolExecutor}.
+ * 
+ * @author The Apache MINA Project ([EMAIL PROTECTED])
+ * @version $Rev$, $Date$
+ */
+public class IoEventQueueThrottle implements IoEventQueueHandler {
+    
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+    
+    private final IoEventSizeEstimator eventSizeEstimator;
+    private volatile int threshold;
+    
+    private final Object lock = new Object();
+    private final AtomicInteger counter = new AtomicInteger();
+    private int waiters;
+
+    public IoEventQueueThrottle() {
+        this(new DefaultIoEventSizeEstimator(), 65536);
+    }
+    
+    public IoEventQueueThrottle(int threshold) {
+        this(new DefaultIoEventSizeEstimator(), threshold);
+    }
+    
+    public IoEventQueueThrottle(IoEventSizeEstimator eventSizeEstimator, int 
threshold) {
+        if (eventSizeEstimator == null) {
+            throw new NullPointerException("eventSizeEstimator");
+        }
+        this.eventSizeEstimator = eventSizeEstimator;
+        
+        setThreshold(threshold);
+    }
+    
+    public IoEventSizeEstimator getEventSizeEstimator() {
+        return eventSizeEstimator;
+    }
+    
+    public int getThreshold() {
+        return threshold;
+    }
+
+    public void setThreshold(int threshold) {
+        if (threshold <= 0) {
+            throw new IllegalArgumentException("threshold: " + threshold);
+        }
+        this.threshold = threshold;
+    }
+
+    public boolean accept(ThreadPoolExecutor executor, IoEvent event) {
+        return true;
+    }
+    
+    public void offered(ThreadPoolExecutor executor, IoEvent event) {
+        int eventSize = getEventSizeEstimator().estimateSize(event);
+        int currentCounter = counter.addAndGet(eventSize);
+        logState();
+
+        if (currentCounter >= threshold) {
+            block();
+        }
+    }
+    
+    public void polled(ThreadPoolExecutor executor, IoEvent event) {
+        int eventSize = getEventSizeEstimator().estimateSize(event);
+        int currentCounter = counter.addAndGet(-eventSize);
+        logState();
+
+        if (currentCounter < threshold) {
+            unblock();
+        }
+    }
+
+    private void logState() {
+        if (logger.isDebugEnabled()) {
+            logger.debug(Thread.currentThread().getName() + " state: " + 
counter.get() + " / " + getThreshold());
+        }
+    }
+    
+    protected void block() {
+        if (logger.isDebugEnabled()) {
+            logger.debug(Thread.currentThread().getName() + " blocked: " + 
counter.get() + " >= " + threshold);
+        }
+        
+        synchronized (lock) {
+            while (counter.get() >= threshold) {
+                waiters ++;
+                try {
+                    lock.wait();
+                } catch (InterruptedException e) {
+                    // Wait uninterruptably.
+                } finally {
+                    waiters --;
+                }
+            }
+        }
+        
+        if (logger.isDebugEnabled()) {
+            logger.debug(Thread.currentThread().getName() + " unblocked: " + 
counter.get() + " < " + threshold);
+        }
+    }
+    
+    protected void unblock() {
+        synchronized (lock) {
+            if (waiters > 0) {
+                lock.notify();
+            }
+        }
+    }
+}

Propchange: 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueThrottle.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueThrottle.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventSizeEstimator.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventSizeEstimator.java?rev=593425&view=auto
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventSizeEstimator.java
 (added)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventSizeEstimator.java
 Thu Nov  8 22:18:21 2007
@@ -0,0 +1,33 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.filter.executor;
+
+import org.apache.mina.common.IoEvent;
+
+/**
+ * Estimates the amount of memory that the specified [EMAIL PROTECTED] 
IoEvent} occupies
+ * in the current JVM.
+ * 
+ * @author The Apache MINA Project ([EMAIL PROTECTED])
+ * @version $Rev$, $Date$
+ */
+public interface IoEventSizeEstimator {
+    int estimateSize(IoEvent event);
+}

Propchange: 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventSizeEstimator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventSizeEstimator.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java?rev=593425&r1=593424&r2=593425&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
 Thu Nov  8 22:18:21 2007
@@ -50,11 +50,11 @@
 
     private static final IoSession EXIT_SIGNAL = new DummySession();
     private static final IoEventQueueHandler NOOP_QUEUE_MONITOR = new 
IoEventQueueHandler() {
-        public boolean accept(OrderedThreadPoolExecutor executor, IoEvent 
event) {
+        public boolean accept(ThreadPoolExecutor executor, IoEvent event) {
             return true;
         }
-        public void offered(OrderedThreadPoolExecutor executor, IoEvent event) 
{}
-        public void polled(OrderedThreadPoolExecutor executor, IoEvent event) 
{}
+        public void offered(ThreadPoolExecutor executor, IoEvent event) {}
+        public void polled(ThreadPoolExecutor executor, IoEvent event) {}
     };
 
     private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer");
@@ -72,6 +72,10 @@
     
     private volatile IoEventQueueHandler queueHandler;
     
+    public OrderedThreadPoolExecutor() {
+        this(16);
+    }
+    
     public OrderedThreadPoolExecutor(int maximumPoolSize) {
         this(0, maximumPoolSize);
     }
@@ -276,27 +280,32 @@
         IoSession s = e.getSession();
         SessionBuffer buf = getSessionBuffer(s);
         Queue<Runnable> queue = buf.queue;
-        boolean offer;
+        boolean offerSession;
+        boolean offeredEvent;
         synchronized (queue) {
-            if (queueHandler.accept(this, e)) {
+            offeredEvent = queueHandler.accept(this, e);
+            if (offeredEvent) {
                 queue.offer(e);
-                queueHandler.offered(this, e);
                 if (buf.processingCompleted) {
                     buf.processingCompleted = false;
-                    offer = true;
+                    offerSession = true;
                 } else {
-                    offer = false;
+                    offerSession = false;
                 }
             } else {
-                offer = false;
+                offerSession = false;
             }
         }
         
-        if (offer) {
+        if (offerSession) {
             waitingSessions.offer(s);
         }
         
         addWorkerIfNecessary();
+        
+        if (offeredEvent) {
+            queueHandler.offered(this, e);
+        }
     }
     
     private void rejectTask(Runnable task) {
@@ -453,7 +462,7 @@
                 
                 if (session == null) {
                     synchronized (workers) {
-                        if (workers.size() >= corePoolSize) {
+                        if (workers.size() > corePoolSize) {
                             // Remove now to prevent duplicate exit.
                             workers.remove(this);
                             break;

Added: 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java?rev=593425&view=auto
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java
 (added)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java
 Thu Nov  8 22:18:21 2007
@@ -0,0 +1,474 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.filter.executor;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.mina.common.IoEvent;
+
+/**
+ * A [EMAIL PROTECTED] ThreadPoolExecutor} that does not maintain the order of 
[EMAIL PROTECTED] IoEvent}s.
+ * This means more than one event handler methods can be invoked at the same
+ * time with mixed order.  For example, let's assume that messageReceived, 
messageSent,
+ * and sessionClosed events are fired.
+ * <ul>
+ * <li>All event handler methods can be called simultaneously.
+ *     (e.g. messageReceived and messageSent can be invoked at the same 
time.)</li>
+ * <li>The event order can be mixed up.
+ *     (e.g. sessionClosed or messageSent can be invoked before messageReceived
+ *           is invoked.)</li>
+ * </ul>
+ * If you need to maintain the order of events per session, please use
+ * [EMAIL PROTECTED] OrderedThreadPoolExecutor}.
+ * 
+ * @author The Apache MINA Project ([EMAIL PROTECTED])
+ * @version $Rev$, $Date$
+ */
+public class UnorderedThreadPoolExecutor extends ThreadPoolExecutor {
+
+    private static final Runnable EXIT_SIGNAL = new Runnable() {
+        public void run() {}
+    };
+    private static final IoEventQueueHandler NOOP_QUEUE_MONITOR = new 
IoEventQueueHandler() {
+        public boolean accept(ThreadPoolExecutor executor, IoEvent event) {
+            return true;
+        }
+        public void offered(ThreadPoolExecutor executor, IoEvent event) {}
+        public void polled(ThreadPoolExecutor executor, IoEvent event) {}
+    };
+
+    private final Set<Worker> workers = new HashSet<Worker>();
+    
+    private volatile int corePoolSize;
+    private volatile int maximumPoolSize;
+    private volatile int largestPoolSize;
+    private final AtomicInteger idleWorkers = new AtomicInteger();
+    
+    private long completedTaskCount;
+    private volatile boolean shutdown;
+    
+    private volatile IoEventQueueHandler queueHandler;
+    
+    public UnorderedThreadPoolExecutor() {
+        this(16);
+    }
+    
+    public UnorderedThreadPoolExecutor(int maximumPoolSize) {
+        this(0, maximumPoolSize);
+    }
+    
+    public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
+        this(corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS);
+    }
+    
+    public UnorderedThreadPoolExecutor(
+            int corePoolSize, int maximumPoolSize, long keepAliveTime, 
TimeUnit unit) {
+        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
Executors.defaultThreadFactory());
+    }
+    
+    public UnorderedThreadPoolExecutor(
+            int corePoolSize, int maximumPoolSize, 
+            long keepAliveTime, TimeUnit unit,
+            IoEventQueueHandler queueMonitor) {
+        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
Executors.defaultThreadFactory(), queueMonitor);
+    }
+
+    public UnorderedThreadPoolExecutor(
+            int corePoolSize, int maximumPoolSize, 
+            long keepAliveTime, TimeUnit unit,
+            ThreadFactory threadFactory) {
+        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
threadFactory, null);
+    }
+
+    public UnorderedThreadPoolExecutor(
+            int corePoolSize, int maximumPoolSize, 
+            long keepAliveTime, TimeUnit unit,
+            ThreadFactory threadFactory, IoEventQueueHandler queueMonitor) {
+        super(0, 1, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(), 
threadFactory, new AbortPolicy());
+        if (corePoolSize < 0) {
+            throw new IllegalArgumentException("corePoolSize: " + 
corePoolSize);
+        }
+        
+        if (maximumPoolSize == 0 || maximumPoolSize < corePoolSize) {
+            throw new IllegalArgumentException("maximumPoolSize: " + 
maximumPoolSize);
+        }
+        
+        this.corePoolSize = corePoolSize;
+        this.maximumPoolSize = maximumPoolSize;
+        setQueueHandler(queueMonitor);
+    }
+    
+    public IoEventQueueHandler getQueueHandler() {
+        return queueHandler;
+    }
+
+    public void setQueueHandler(IoEventQueueHandler queueHandler) {
+        if (queueHandler == null) {
+            queueHandler = NOOP_QUEUE_MONITOR;
+        }
+        this.queueHandler = queueHandler;
+    }
+
+    @Override
+    public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
+        // Ignore the request.  It must always be AbortPolicy.
+    }
+
+    private void addWorker() {
+        synchronized (workers) {
+            if (workers.size() >= maximumPoolSize) {
+                return;
+            }
+
+            Worker worker = new Worker();
+            Thread thread = getThreadFactory().newThread(worker);
+            idleWorkers.incrementAndGet();
+            thread.start();
+            workers.add(worker);
+            
+            if (workers.size() > largestPoolSize) {
+                largestPoolSize = workers.size();
+            }
+        }
+    }
+    
+    private void addWorkerIfNecessary() {
+        if (idleWorkers.get() == 0) {
+            synchronized (workers) {
+                if (workers.isEmpty() || idleWorkers.get() == 0) {
+                    addWorker();
+                }
+            }
+        }
+    }
+    
+    private void removeWorker() {
+        synchronized (workers) {
+            if (workers.size() <= corePoolSize) {
+                return;
+            }
+            getQueue().offer(EXIT_SIGNAL);
+        }
+    }
+    
+    @Override
+    public int getMaximumPoolSize() {
+        return maximumPoolSize;
+    }
+    
+    @Override
+    public void setMaximumPoolSize(int maximumPoolSize) {
+        synchronized (workers) {
+            if (maximumPoolSize == 0 || maximumPoolSize < corePoolSize) {
+                throw new IllegalArgumentException("maximumPoolSize: " + 
maximumPoolSize);
+            }
+            
+            if (this.maximumPoolSize > maximumPoolSize) {
+                for (int i = this.maximumPoolSize - maximumPoolSize; i > 0; i 
--) {
+                    removeWorker();
+                }
+            }
+            this.maximumPoolSize = maximumPoolSize;
+        }
+    }
+    
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit)
+            throws InterruptedException {
+        
+        long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
+        
+        synchronized (workers) {
+            while (!isTerminated()) {
+                long waitTime = deadline - System.currentTimeMillis();
+                if (waitTime <= 0) {
+                    break;
+                }
+                
+                workers.wait(waitTime);
+            }
+        }
+        return isTerminated();
+    }
+
+    @Override
+    public boolean isShutdown() {
+        return shutdown;
+    }
+
+    @Override
+    public boolean isTerminated() {
+        if (!shutdown) {
+            return false;
+        }
+        
+        synchronized (workers) {
+            return workers.isEmpty();
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        if (shutdown) {
+            return;
+        }
+        
+        shutdown = true;
+
+        synchronized (workers) {
+            for (int i = workers.size(); i > 0; i --) {
+                getQueue().offer(EXIT_SIGNAL);
+            }
+        }
+    }
+
+    @Override
+    public List<Runnable> shutdownNow() {
+        shutdown();
+        
+        List<Runnable> answer = new ArrayList<Runnable>();
+        Runnable task;
+        while ((task = getQueue().poll()) != null) {
+            if (task == EXIT_SIGNAL) {
+                getQueue().offer(EXIT_SIGNAL);
+                Thread.yield(); // Let others take the signal.
+                continue;
+            }
+            
+            answer.add(task);
+        }
+        
+        return answer;
+    }
+
+    @Override
+    public void execute(Runnable task) {
+        if (shutdown) {
+            rejectTask(task);
+        }
+
+        checkTaskType(task);
+        
+        IoEvent e = (IoEvent) task;
+        boolean offeredEvent = queueHandler.accept(this, e);
+        if (offeredEvent) {
+            getQueue().offer(e);
+        }
+        
+        addWorkerIfNecessary();
+        
+        if (offeredEvent) {
+            queueHandler.offered(this, e);
+        }
+    }
+    
+    private void rejectTask(Runnable task) {
+        getRejectedExecutionHandler().rejectedExecution(task, this);
+    }
+    
+    private void checkTaskType(Runnable task) {
+        if (!(task instanceof IoEvent)) {
+            throw new IllegalArgumentException("task must be an IoEvent or its 
subclass.");
+        }
+    }
+
+    @Override
+    public int getActiveCount() {
+        synchronized (workers) {
+            return workers.size() - idleWorkers.get();
+        }
+    }
+
+    @Override
+    public long getCompletedTaskCount() {
+        synchronized (workers) {
+            long answer = completedTaskCount;
+            for (Worker w: workers) {
+                answer += w.completedTaskCount;
+            }
+            
+            return answer;
+        }
+    }
+
+    @Override
+    public int getLargestPoolSize() {
+        return largestPoolSize;
+    }
+
+    @Override
+    public int getPoolSize() {
+        synchronized (workers) {
+            return workers.size();
+        }
+    }
+
+    @Override
+    public long getTaskCount() {
+        return getCompletedTaskCount();
+    }
+
+    @Override
+    public boolean isTerminating() {
+        synchronized (workers) {
+            return isShutdown() && !isTerminated();
+        }
+    }
+
+    @Override
+    public int prestartAllCoreThreads() {
+        int answer = 0;
+        synchronized (workers) {
+            for (int i = corePoolSize - workers.size() ; i > 0; i --) {
+                addWorker();
+                answer ++;
+            }
+        }
+        return answer;
+    }
+
+    @Override
+    public boolean prestartCoreThread() {
+        synchronized (workers) {
+            if (workers.size() < corePoolSize) {
+                addWorker();
+                return true;
+            } else {
+                return false;
+            }
+        }
+    }
+    
+    @Override
+    public int getCorePoolSize() {
+        return corePoolSize;
+    }
+
+    @Override
+    public void setCorePoolSize(int corePoolSize) {
+        if (corePoolSize < 0) {
+            throw new IllegalArgumentException("corePoolSize: " + 
corePoolSize);
+        }
+        
+        synchronized (workers) {
+            if (this.corePoolSize > corePoolSize) {
+                for (int i = this.corePoolSize - corePoolSize; i > 0; i --) {
+                    removeWorker();
+                }
+            }
+            this.corePoolSize = corePoolSize;
+        }
+    }
+    
+    private class Worker implements Runnable {
+        
+        private volatile long completedTaskCount;
+        private Thread thread;
+        
+        public void run() {
+            thread = Thread.currentThread();
+            
+            for (;;) {
+                Runnable task = fetchTask();
+                
+                idleWorkers.decrementAndGet();
+                
+                if (task == null) {
+                    synchronized (workers) {
+                        if (workers.size() > corePoolSize) {
+                            // Remove now to prevent duplicate exit.
+                            workers.remove(this);
+                            break;
+                        }
+                    }
+                }
+                
+                if (task == EXIT_SIGNAL) {
+                    break;
+                }
+                
+                queueHandler.polled(UnorderedThreadPoolExecutor.this, 
(IoEvent) task);
+                try {
+                    runTask(task);
+                } finally {
+                    idleWorkers.incrementAndGet();
+                }
+            }
+            
+            synchronized (workers) {
+                workers.remove(this);
+                UnorderedThreadPoolExecutor.this.completedTaskCount += 
completedTaskCount;
+                workers.notifyAll();
+            }
+        }
+
+        private Runnable fetchTask() {
+            Runnable task = null;
+            long currentTime = System.currentTimeMillis();
+            long deadline = currentTime + 
getKeepAliveTime(TimeUnit.MILLISECONDS);
+            for (;;) {
+                try {
+                    long waitTime = deadline - currentTime;
+                    if (waitTime <= 0) {
+                        break;
+                    }
+
+                    try {
+                        task = getQueue().poll(waitTime, 
TimeUnit.MILLISECONDS);
+                        break;
+                    } finally {
+                        if (task == null) {
+                            currentTime = System.currentTimeMillis();
+                        }
+                    }
+                } catch (InterruptedException e) {
+                    // Ignore.
+                    continue;
+                }
+            }
+            return task;
+        }
+
+        private void runTask(Runnable task) {
+            beforeExecute(thread, task);
+            boolean ran = false;
+            try {
+                task.run();
+                ran = true;
+                afterExecute(task, null);
+                completedTaskCount ++;
+            } catch (RuntimeException e) {
+                if (!ran)
+                    afterExecute(task, e);
+                throw e;
+            }
+        }
+    }
+}

Propchange: 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date


Reply via email to