Repository: mina
Updated Branches:
  refs/heads/2.1.0 c95171c88 -> f858147e7


Removed a spurious printStackTrace (DIRMINA-1092)

Project: http://git-wip-us.apache.org/repos/asf/mina/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/bd7ee98f
Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/bd7ee98f
Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/bd7ee98f

Branch: refs/heads/2.1.0
Commit: bd7ee98f1ec49f462ff140f3662910395e934281
Parents: 56ca189
Author: Emmanuel Lécharny <[email protected]>
Authored: Mon Jul 30 18:21:49 2018 +0200
Committer: Emmanuel Lécharny <[email protected]>
Committed: Mon Aug 27 17:50:34 2018 +0200

----------------------------------------------------------------------
 .../polling/AbstractPollingIoProcessor.java     |   2 -
 .../executor/PriorityThreadPoolExecutor.java    | 917 ++++++++++---------
 2 files changed, 462 insertions(+), 457 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina/blob/bd7ee98f/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
----------------------------------------------------------------------
diff --git 
a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
 
b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
index 2ee0b96..78807ee 100644
--- 
a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
+++ 
b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
@@ -1106,8 +1106,6 @@ public abstract class AbstractPollingIoProcessor<S 
extends AbstractIoSession> im
                 try {
                     localWrittenBytes = write(session, buf, length);
                 } catch (IOException ioe) {
-                    ioe.printStackTrace();
-
                     // We have had an issue while trying to send data to the
                     // peer : let's close the session.
                     buf.free();

http://git-wip-us.apache.org/repos/asf/mina/blob/bd7ee98f/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git 
a/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java
 
b/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java
index dd6b6e1..b0ace13 100644
--- 
a/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java
+++ 
b/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java
@@ -65,8 +65,7 @@ public class PriorityThreadPoolExecutor extends 
ThreadPoolExecutor {
     private static final SessionEntry EXIT_SIGNAL = new SessionEntry(new 
DummySession(), null);
 
     /**
-     * A key stored into the session's attribute for the event tasks being
-     * queued
+     * A key stored into the session's attribute for the event tasks being 
queued
      */
     private static final AttributeKey TASKS_QUEUE = new 
AttributeKey(PriorityThreadPoolExecutor.class, "tasksQueue");
 
@@ -88,45 +87,49 @@ public class PriorityThreadPoolExecutor extends 
ThreadPoolExecutor {
     private final Comparator<IoSession> comparator;
 
     /**
-     * Creates a default ThreadPool, with default values : - minimum pool size
-     * is 0 - maximum pool size is 16 - keepAlive set to 30 seconds - A default
+     * Creates a default ThreadPool, with default values : - minimum pool size 
is 0
+     * - maximum pool size is 16 - keepAlive set to 30 seconds - A default
      * ThreadFactory - All events are accepted
      */
     public PriorityThreadPoolExecutor() {
-       this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL, 
DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null, 
null);
+        this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL, 
DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS,
+                Executors.defaultThreadFactory(), null, null);
     }
 
     /**
-     * Creates a default ThreadPool, with default values : - minimum pool size
-     * is 0 - maximum pool size is 16 - keepAlive set to 30 seconds - A default
+     * Creates a default ThreadPool, with default values : - minimum pool size 
is 0
+     * - maximum pool size is 16 - keepAlive set to 30 seconds - A default
      * ThreadFactory - All events are accepted
      */
     public PriorityThreadPoolExecutor(Comparator<IoSession> comparator) {
-       this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL, 
DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null, 
comparator);
+        this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL, 
DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS,
+                Executors.defaultThreadFactory(), null, comparator);
     }
 
     /**
-     * Creates a default ThreadPool, with default values : - minimum pool size
-     * is 0 - keepAlive set to 30 seconds - A default ThreadFactory - All 
events
-     * are accepted
+     * Creates a default ThreadPool, with default values : - minimum pool size 
is 0
+     * - keepAlive set to 30 seconds - A default ThreadFactory - All events are
+     * accepted
      *
      * @param maximumPoolSize
      *            The maximum pool size
      */
     public PriorityThreadPoolExecutor(int maximumPoolSize) {
-       this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, 
DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null, 
null);
+        this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, 
DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS,
+                Executors.defaultThreadFactory(), null, null);
     }
 
     /**
-     * Creates a default ThreadPool, with default values : - minimum pool size
-     * is 0 - keepAlive set to 30 seconds - A default ThreadFactory - All 
events
-     * are accepted
+     * Creates a default ThreadPool, with default values : - minimum pool size 
is 0
+     * - keepAlive set to 30 seconds - A default ThreadFactory - All events are
+     * accepted
      *
      * @param maximumPoolSize
      *            The maximum pool size
      */
     public PriorityThreadPoolExecutor(int maximumPoolSize, 
Comparator<IoSession> comparator) {
-       this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, 
DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null, 
comparator);
+        this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, 
DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS,
+                Executors.defaultThreadFactory(), null, comparator);
     }
 
     /**
@@ -139,12 +142,13 @@ public class PriorityThreadPoolExecutor extends 
ThreadPoolExecutor {
      *            The maximum pool size
      */
     public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
-       this(corePoolSize, maximumPoolSize, DEFAULT_KEEP_ALIVE, 
TimeUnit.SECONDS, Executors.defaultThreadFactory(), null, null);
+        this(corePoolSize, maximumPoolSize, DEFAULT_KEEP_ALIVE, 
TimeUnit.SECONDS, Executors.defaultThreadFactory(),
+                null, null);
     }
 
     /**
-     * Creates a default ThreadPool, with default values : - A default
-     * ThreadFactory - All events are accepted
+     * Creates a default ThreadPool, with default values : - A default 
ThreadFactory
+     * - All events are accepted
      *
      * @param corePoolSize
      *            The initial pool sizePoolSize
@@ -156,12 +160,11 @@ public class PriorityThreadPoolExecutor extends 
ThreadPoolExecutor {
      *            Time unit used for the keepAlive value
      */
     public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
long keepAliveTime, TimeUnit unit) {
-       this(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
Executors.defaultThreadFactory(), null, null);
+        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
Executors.defaultThreadFactory(), null, null);
     }
 
     /**
-     * Creates a default ThreadPool, with default values : - A default
-     * ThreadFactory
+     * Creates a default ThreadPool, with default values : - A default 
ThreadFactory
      *
      * @param corePoolSize
      *            The initial pool sizePoolSize
@@ -174,13 +177,14 @@ public class PriorityThreadPoolExecutor extends 
ThreadPoolExecutor {
      * @param eventQueueHandler
      *            The queue used to store events
      */
-    public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
long keepAliveTime, TimeUnit unit, IoEventQueueHandler eventQueueHandler) {
-       this(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
Executors.defaultThreadFactory(), eventQueueHandler, null);
+    public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
long keepAliveTime, TimeUnit unit,
+            IoEventQueueHandler eventQueueHandler) {
+        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
Executors.defaultThreadFactory(), eventQueueHandler,
+                null);
     }
 
     /**
-     * Creates a default ThreadPool, with default values : - A default
-     * ThreadFactory
+     * Creates a default ThreadPool, with default values : - A default 
ThreadFactory
      *
      * @param corePoolSize
      *            The initial pool sizePoolSize
@@ -193,8 +197,9 @@ public class PriorityThreadPoolExecutor extends 
ThreadPoolExecutor {
      * @param threadFactory
      *            The factory used to create threads
      */
-    public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
-       this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, 
null, null);
+    public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
long keepAliveTime, TimeUnit unit,
+            ThreadFactory threadFactory) {
+        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
threadFactory, null, null);
     }
 
     /**
@@ -213,66 +218,68 @@ public class PriorityThreadPoolExecutor extends 
ThreadPoolExecutor {
      * @param eventQueueHandler
      *            The queue used to store events
      */
-    public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, 
IoEventQueueHandler eventQueueHandler, Comparator<IoSession> comparator) {
-       // We have to initialize the pool with default values (0 and 1) in order
-       // to
-       // handle the exception in a better way. We can't add a try {} catch()
-       // {}
-       // around the super() call.
-       super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit, new 
SynchronousQueue<Runnable>(), threadFactory, new AbortPolicy());
+    public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
long keepAliveTime, TimeUnit unit,
+            ThreadFactory threadFactory, IoEventQueueHandler 
eventQueueHandler, Comparator<IoSession> comparator) {
+        // We have to initialize the pool with default values (0 and 1) in 
order
+        // to
+        // handle the exception in a better way. We can't add a try {} catch()
+        // {}
+        // around the super() call.
+        super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit, new 
SynchronousQueue<Runnable>(), threadFactory,
+                new AbortPolicy());
 
-       if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) {
-           throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
-       }
+        if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) {
+            throw new IllegalArgumentException("corePoolSize: " + 
corePoolSize);
+        }
 
-       if ((maximumPoolSize == 0) || (maximumPoolSize < corePoolSize)) {
-           throw new IllegalArgumentException("maximumPoolSize: " + 
maximumPoolSize);
-       }
+        if ((maximumPoolSize <= 0) || (maximumPoolSize < corePoolSize)) {
+            throw new IllegalArgumentException("maximumPoolSize: " + 
maximumPoolSize);
+        }
 
-       // Now, we can setup the pool sizes
-       super.setCorePoolSize(corePoolSize);
-       super.setMaximumPoolSize(maximumPoolSize);
+        // Now, we can setup the pool sizes
+        super.setMaximumPoolSize(maximumPoolSize);
+        super.setCorePoolSize(corePoolSize);
 
-       // The queueHandler might be null.
-       if (eventQueueHandler == null) {
-           this.eventQueueHandler = IoEventQueueHandler.NOOP;
-       } else {
-           this.eventQueueHandler = eventQueueHandler;
-       }
+        // The queueHandler might be null.
+        if (eventQueueHandler == null) {
+            this.eventQueueHandler = IoEventQueueHandler.NOOP;
+        } else {
+            this.eventQueueHandler = eventQueueHandler;
+        }
 
-       // The comparator can be null.
-       this.comparator = comparator;
+        // The comparator can be null.
+        this.comparator = comparator;
 
-       if (this.comparator == null) {
-           this.waitingSessions = new LinkedBlockingQueue<>();
-       } else {
-           this.waitingSessions = new PriorityBlockingQueue<>();
-       }
+        if (this.comparator == null) {
+            this.waitingSessions = new LinkedBlockingQueue<>();
+        } else {
+            this.waitingSessions = new PriorityBlockingQueue<>();
+        }
     }
 
     /**
      * Get the session's tasks queue.
      */
     private SessionQueue getSessionTasksQueue(IoSession session) {
-       SessionQueue queue = (SessionQueue) session.getAttribute(TASKS_QUEUE);
+        SessionQueue queue = (SessionQueue) session.getAttribute(TASKS_QUEUE);
 
-       if (queue == null) {
-           queue = new SessionQueue();
-           SessionQueue oldQueue = (SessionQueue) 
session.setAttributeIfAbsent(TASKS_QUEUE, queue);
+        if (queue == null) {
+            queue = new SessionQueue();
+            SessionQueue oldQueue = (SessionQueue) 
session.setAttributeIfAbsent(TASKS_QUEUE, queue);
 
-           if (oldQueue != null) {
-               queue = oldQueue;
-           }
-       }
+            if (oldQueue != null) {
+                queue = oldQueue;
+            }
+        }
 
-       return queue;
+        return queue;
     }
 
     /**
      * @return The associated queue handler.
      */
     public IoEventQueueHandler getQueueHandler() {
-       return eventQueueHandler;
+        return eventQueueHandler;
     }
 
     /**
@@ -280,56 +287,56 @@ public class PriorityThreadPoolExecutor extends 
ThreadPoolExecutor {
      */
     @Override
     public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
-       // Ignore the request. It must always be AbortPolicy.
+        // Ignore the request. It must always be AbortPolicy.
     }
 
     /**
-     * Add a new thread to execute a task, if needed and possible. It depends 
on
-     * the current pool size. If it's full, we do nothing.
+     * Add a new thread to execute a task, if needed and possible. It depends 
on the
+     * current pool size. If it's full, we do nothing.
      */
     private void addWorker() {
-       synchronized (workers) {
-           if (workers.size() >= super.getMaximumPoolSize()) {
-               return;
-           }
+        synchronized (workers) {
+            if (workers.size() >= super.getMaximumPoolSize()) {
+                return;
+            }
 
-           // Create a new worker, and add it to the thread pool
-           Worker worker = new Worker();
-           Thread thread = getThreadFactory().newThread(worker);
+            // Create a new worker, and add it to the thread pool
+            Worker worker = new Worker();
+            Thread thread = getThreadFactory().newThread(worker);
 
-           // As we have added a new thread, it's considered as idle.
-           idleWorkers.incrementAndGet();
+            // As we have added a new thread, it's considered as idle.
+            idleWorkers.incrementAndGet();
 
-           // Now, we can start it.
-           thread.start();
-           workers.add(worker);
+            // Now, we can start it.
+            thread.start();
+            workers.add(worker);
 
-           if (workers.size() > largestPoolSize) {
-               largestPoolSize = workers.size();
-           }
-       }
+            if (workers.size() > largestPoolSize) {
+                largestPoolSize = workers.size();
+            }
+        }
     }
 
     /**
      * Add a new Worker only if there are no idle worker.
      */
     private void addWorkerIfNecessary() {
-       if (idleWorkers.get() == 0) {
-           synchronized (workers) {
-               if (workers.isEmpty() || (idleWorkers.get() == 0)) {
-                   addWorker();
-               }
-           }
-       }
+        if (idleWorkers.get() == 0) {
+            synchronized (workers) {
+                if (workers.isEmpty() || (idleWorkers.get() == 0)) {
+                    addWorker();
+                }
+            }
+        }
     }
 
     private void removeWorker() {
-       synchronized (workers) {
-           if (workers.size() <= super.getCorePoolSize()) {
-               return;
-           }
-           waitingSessions.offer(EXIT_SIGNAL);
-       }
+        synchronized (workers) {
+            if (workers.size() <= super.getCorePoolSize()) {
+                return;
+            }
+            waitingSessions.offer(EXIT_SIGNAL);
+        }
     }
 
     /**
@@ -337,18 +344,18 @@ public class PriorityThreadPoolExecutor extends 
ThreadPoolExecutor {
      */
     @Override
     public void setMaximumPoolSize(int maximumPoolSize) {
-       if ((maximumPoolSize <= 0) || (maximumPoolSize < 
super.getCorePoolSize())) {
-           throw new IllegalArgumentException("maximumPoolSize: " + 
maximumPoolSize);
-       }
+        if ((maximumPoolSize <= 0) || (maximumPoolSize < 
super.getCorePoolSize())) {
+            throw new IllegalArgumentException("maximumPoolSize: " + 
maximumPoolSize);
+        }
 
-       synchronized (workers) {
-           super.setMaximumPoolSize(maximumPoolSize);
-           int difference = workers.size() - maximumPoolSize;
-           while (difference > 0) {
-               removeWorker();
-               --difference;
-           }
-       }
+        synchronized (workers) {
+            super.setMaximumPoolSize(maximumPoolSize);
+            int difference = workers.size() - maximumPoolSize;
+            while (difference > 0) {
+                removeWorker();
+                --difference;
+            }
+        }
     }
 
     /**
@@ -357,19 +364,19 @@ public class PriorityThreadPoolExecutor extends 
ThreadPoolExecutor {
     @Override
     public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
 
-       long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
+        long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
 
-       synchronized (workers) {
-           while (!isTerminated()) {
-               long waitTime = deadline - System.currentTimeMillis();
-               if (waitTime <= 0) {
-                   break;
-               }
+        synchronized (workers) {
+            while (!isTerminated()) {
+                long waitTime = deadline - System.currentTimeMillis();
+                if (waitTime <= 0) {
+                    break;
+                }
 
-               workers.wait(waitTime);
-           }
-       }
-       return isTerminated();
+                workers.wait(waitTime);
+            }
+        }
+        return isTerminated();
     }
 
     /**
@@ -377,7 +384,7 @@ public class PriorityThreadPoolExecutor extends 
ThreadPoolExecutor {
      */
     @Override
     public boolean isShutdown() {
-       return shutdown;
+        return shutdown;
     }
 
     /**
@@ -385,13 +392,13 @@ public class PriorityThreadPoolExecutor extends 
ThreadPoolExecutor {
      */
     @Override
     public boolean isTerminated() {
-       if (!shutdown) {
-           return false;
-       }
+        if (!shutdown) {
+            return false;
+        }
 
-       synchronized (workers) {
-           return workers.isEmpty();
-       }
+        synchronized (workers) {
+            return workers.isEmpty();
+        }
     }
 
     /**
@@ -399,17 +406,17 @@ public class PriorityThreadPoolExecutor extends 
ThreadPoolExecutor {
      */
     @Override
     public void shutdown() {
-       if (shutdown) {
-           return;
-       }
+        if (shutdown) {
+            return;
+        }
 
-       shutdown = true;
+        shutdown = true;
 
-       synchronized (workers) {
-           for (int i = workers.size(); i > 0; i--) {
-               waitingSessions.offer(EXIT_SIGNAL);
-           }
-       }
+        synchronized (workers) {
+            for (int i = workers.size(); i > 0; i--) {
+                waitingSessions.offer(EXIT_SIGNAL);
+            }
+        }
     }
 
     /**
@@ -417,53 +424,53 @@ public class PriorityThreadPoolExecutor extends 
ThreadPoolExecutor {
      */
     @Override
     public List<Runnable> shutdownNow() {
-       shutdown();
+        shutdown();
 
-       List<Runnable> answer = new ArrayList<>();
-       SessionEntry entry;
+        List<Runnable> answer = new ArrayList<>();
+        SessionEntry entry;
 
-       while ((entry = waitingSessions.poll()) != null) {
-           if (entry == EXIT_SIGNAL) {
-               waitingSessions.offer(EXIT_SIGNAL);
-               Thread.yield(); // Let others take the signal.
-               continue;
-           }
+        while ((entry = waitingSessions.poll()) != null) {
+            if (entry == EXIT_SIGNAL) {
+                waitingSessions.offer(EXIT_SIGNAL);
+                Thread.yield(); // Let others take the signal.
+                continue;
+            }
 
-           SessionQueue sessionTasksQueue = (SessionQueue) 
entry.getSession().getAttribute(TASKS_QUEUE);
+            SessionQueue sessionTasksQueue = (SessionQueue) 
entry.getSession().getAttribute(TASKS_QUEUE);
 
-           synchronized (sessionTasksQueue.tasksQueue) {
+            synchronized (sessionTasksQueue.tasksQueue) {
 
-               for (Runnable task : sessionTasksQueue.tasksQueue) {
-                   getQueueHandler().polled(this, (IoEvent) task);
-                   answer.add(task);
-               }
+                for (Runnable task : sessionTasksQueue.tasksQueue) {
+                    getQueueHandler().polled(this, (IoEvent) task);
+                    answer.add(task);
+                }
 
-               sessionTasksQueue.tasksQueue.clear();
-           }
-       }
+                sessionTasksQueue.tasksQueue.clear();
+            }
+        }
 
-       return answer;
+        return answer;
     }
 
     /**
      * A Helper class used to print the list of events being queued.
      */
     private void print(Queue<Runnable> queue, IoEvent event) {
-       StringBuilder sb = new StringBuilder();
-       sb.append("Adding event ").append(event.getType()).append(" to session 
").append(event.getSession().getId());
-       boolean first = true;
-       sb.append("\nQueue : [");
-       for (Runnable elem : queue) {
-           if (first) {
-               first = false;
-           } else {
-               sb.append(", ");
-           }
+        StringBuilder sb = new StringBuilder();
+        sb.append("Adding event ").append(event.getType()).append(" to session 
").append(event.getSession().getId());
+        boolean first = true;
+        sb.append("\nQueue : [");
+        for (Runnable elem : queue) {
+            if (first) {
+                first = false;
+            } else {
+                sb.append(", ");
+            }
 
-           sb.append(((IoEvent) elem).getType()).append(", ");
-       }
-       sb.append("]\n");
-       LOGGER.debug(sb.toString());
+            sb.append(((IoEvent) elem).getType()).append(", ");
+        }
+        sb.append("]\n");
+        LOGGER.debug(sb.toString());
     }
 
     /**
@@ -471,72 +478,72 @@ public class PriorityThreadPoolExecutor extends 
ThreadPoolExecutor {
      */
     @Override
     public void execute(Runnable task) {
-       if (shutdown) {
-           rejectTask(task);
-       }
+        if (shutdown) {
+            rejectTask(task);
+        }
 
-       // Check that it's a IoEvent task
-       checkTaskType(task);
+        // Check that it's a IoEvent task
+        checkTaskType(task);
 
-       IoEvent event = (IoEvent) task;
+        IoEvent event = (IoEvent) task;
 
-       // Get the associated session
-       IoSession session = event.getSession();
+        // Get the associated session
+        IoSession session = event.getSession();
 
-       // Get the session's queue of events
-       SessionQueue sessionTasksQueue = getSessionTasksQueue(session);
-       Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
+        // Get the session's queue of events
+        SessionQueue sessionTasksQueue = getSessionTasksQueue(session);
+        Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
 
-       boolean offerSession;
+        boolean offerSession;
 
-       // propose the new event to the event queue handler. If we
-       // use a throttle queue handler, the message may be rejected
-       // if the maximum size has been reached.
-       boolean offerEvent = eventQueueHandler.accept(this, event);
+        // propose the new event to the event queue handler. If we
+        // use a throttle queue handler, the message may be rejected
+        // if the maximum size has been reached.
+        boolean offerEvent = eventQueueHandler.accept(this, event);
 
-       if (offerEvent) {
-           // Ok, the message has been accepted
-           synchronized (tasksQueue) {
-               // Inject the event into the executor taskQueue
-               tasksQueue.offer(event);
+        if (offerEvent) {
+            // Ok, the message has been accepted
+            synchronized (tasksQueue) {
+                // Inject the event into the executor taskQueue
+                tasksQueue.offer(event);
 
-               if (sessionTasksQueue.processingCompleted) {
-                   sessionTasksQueue.processingCompleted = false;
-                   offerSession = true;
-               } else {
-                   offerSession = false;
-               }
+                if (sessionTasksQueue.processingCompleted) {
+                    sessionTasksQueue.processingCompleted = false;
+                    offerSession = true;
+                } else {
+                    offerSession = false;
+                }
 
-               if (LOGGER.isDebugEnabled()) {
-                   print(tasksQueue, event);
-               }
-           }
-       } else {
-           offerSession = false;
-       }
+                if (LOGGER.isDebugEnabled()) {
+                    print(tasksQueue, event);
+                }
+            }
+        } else {
+            offerSession = false;
+        }
 
-       if (offerSession) {
-           // As the tasksQueue was empty, the task has been executed
-           // immediately, so we can move the session to the queue
-           // of sessions waiting for completion.
-           waitingSessions.offer(new SessionEntry(session, comparator));
-       }
+        if (offerSession) {
+            // As the tasksQueue was empty, the task has been executed
+            // immediately, so we can move the session to the queue
+            // of sessions waiting for completion.
+            waitingSessions.offer(new SessionEntry(session, comparator));
+        }
 
-       addWorkerIfNecessary();
+        addWorkerIfNecessary();
 
-       if (offerEvent) {
-           eventQueueHandler.offered(this, event);
-       }
+        if (offerEvent) {
+            eventQueueHandler.offered(this, event);
+        }
     }
 
     private void rejectTask(Runnable task) {
-       getRejectedExecutionHandler().rejectedExecution(task, this);
+        getRejectedExecutionHandler().rejectedExecution(task, this);
     }
 
     private void checkTaskType(Runnable task) {
-       if (!(task instanceof IoEvent)) {
-           throw new IllegalArgumentException("task must be an IoEvent or its 
subclass.");
-       }
+        if (!(task instanceof IoEvent)) {
+            throw new IllegalArgumentException("task must be an IoEvent or its 
subclass.");
+        }
     }
 
     /**
@@ -544,9 +551,9 @@ public class PriorityThreadPoolExecutor extends 
ThreadPoolExecutor {
      */
     @Override
     public int getActiveCount() {
-       synchronized (workers) {
-           return workers.size() - idleWorkers.get();
-       }
+        synchronized (workers) {
+            return workers.size() - idleWorkers.get();
+        }
     }
 
     /**
@@ -554,14 +561,14 @@ public class PriorityThreadPoolExecutor extends 
ThreadPoolExecutor {
      */
     @Override
     public long getCompletedTaskCount() {
-       synchronized (workers) {
-           long answer = completedTaskCount;
-           for (Worker w : workers) {
-               answer += w.completedTaskCount.get();
-           }
+        synchronized (workers) {
+            long answer = completedTaskCount;
+            for (Worker w : workers) {
+                answer += w.completedTaskCount.get();
+            }
 
-           return answer;
-       }
+            return answer;
+        }
     }
 
     /**
@@ -569,7 +576,7 @@ public class PriorityThreadPoolExecutor extends 
ThreadPoolExecutor {
      */
     @Override
     public int getLargestPoolSize() {
-       return largestPoolSize;
+        return largestPoolSize;
     }
 
     /**
@@ -577,9 +584,9 @@ public class PriorityThreadPoolExecutor extends 
ThreadPoolExecutor {
      */
     @Override
     public int getPoolSize() {
-       synchronized (workers) {
-           return workers.size();
-       }
+        synchronized (workers) {
+            return workers.size();
+        }
     }
 
     /**
@@ -587,7 +594,7 @@ public class PriorityThreadPoolExecutor extends 
ThreadPoolExecutor {
      */
     @Override
     public long getTaskCount() {
-       return getCompletedTaskCount();
+        return getCompletedTaskCount();
     }
 
     /**
@@ -595,9 +602,9 @@ public class PriorityThreadPoolExecutor extends 
ThreadPoolExecutor {
      */
     @Override
     public boolean isTerminating() {
-       synchronized (workers) {
-           return isShutdown() && !isTerminated();
-       }
+        synchronized (workers) {
+            return isShutdown() && !isTerminated();
+        }
     }
 
     /**
@@ -605,14 +612,14 @@ public class PriorityThreadPoolExecutor extends 
ThreadPoolExecutor {
      */
     @Override
     public int prestartAllCoreThreads() {
-       int answer = 0;
-       synchronized (workers) {
-           for (int i = super.getCorePoolSize() - workers.size(); i > 0; i--) {
-               addWorker();
-               answer++;
-           }
-       }
-       return answer;
+        int answer = 0;
+        synchronized (workers) {
+            for (int i = super.getCorePoolSize() - workers.size(); i > 0; i--) 
{
+                addWorker();
+                answer++;
+            }
+        }
+        return answer;
     }
 
     /**
@@ -620,14 +627,14 @@ public class PriorityThreadPoolExecutor extends 
ThreadPoolExecutor {
      */
     @Override
     public boolean prestartCoreThread() {
-       synchronized (workers) {
-           if (workers.size() < super.getCorePoolSize()) {
-               addWorker();
-               return true;
-           } else {
-               return false;
-           }
-       }
+        synchronized (workers) {
+            if (workers.size() < super.getCorePoolSize()) {
+                addWorker();
+                return true;
+            } else {
+                return false;
+            }
+        }
     }
 
     /**
@@ -635,7 +642,7 @@ public class PriorityThreadPoolExecutor extends 
ThreadPoolExecutor {
      */
     @Override
     public BlockingQueue<Runnable> getQueue() {
-       throw new UnsupportedOperationException();
+        throw new UnsupportedOperationException();
     }
 
     /**
@@ -643,7 +650,7 @@ public class PriorityThreadPoolExecutor extends 
ThreadPoolExecutor {
      */
     @Override
     public void purge() {
-       // Nothing to purge in this implementation.
+        // Nothing to purge in this implementation.
     }
 
     /**
@@ -651,27 +658,27 @@ public class PriorityThreadPoolExecutor extends 
ThreadPoolExecutor {
      */
     @Override
     public boolean remove(Runnable task) {
-       checkTaskType(task);
-       IoEvent event = (IoEvent) task;
-       IoSession session = event.getSession();
-       SessionQueue sessionTasksQueue = (SessionQueue) 
session.getAttribute(TASKS_QUEUE);
+        checkTaskType(task);
+        IoEvent event = (IoEvent) task;
+        IoSession session = event.getSession();
+        SessionQueue sessionTasksQueue = (SessionQueue) 
session.getAttribute(TASKS_QUEUE);
 
-       if (sessionTasksQueue == null) {
-           return false;
-       }
+        if (sessionTasksQueue == null) {
+            return false;
+        }
 
-       boolean removed;
-       Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
+        boolean removed;
+        Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
 
-       synchronized (tasksQueue) {
-           removed = tasksQueue.remove(task);
-       }
+        synchronized (tasksQueue) {
+            removed = tasksQueue.remove(task);
+        }
 
-       if (removed) {
-           getQueueHandler().polled(this, event);
-       }
+        if (removed) {
+            getQueueHandler().polled(this, event);
+        }
 
-       return removed;
+        return removed;
     }
 
     /**
@@ -679,141 +686,141 @@ public class PriorityThreadPoolExecutor extends 
ThreadPoolExecutor {
      */
     @Override
     public void setCorePoolSize(int corePoolSize) {
-       if (corePoolSize < 0) {
-           throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
-       }
-       if (corePoolSize > super.getMaximumPoolSize()) {
-           throw new IllegalArgumentException("corePoolSize exceeds 
maximumPoolSize");
-       }
-
-       synchronized (workers) {
-           if (super.getCorePoolSize() > corePoolSize) {
-               for (int i = super.getCorePoolSize() - corePoolSize; i > 0; 
i--) {
-                   removeWorker();
-               }
-           }
-           super.setCorePoolSize(corePoolSize);
-       }
+        if (corePoolSize < 0) {
+            throw new IllegalArgumentException("corePoolSize: " + 
corePoolSize);
+        }
+        if (corePoolSize > super.getMaximumPoolSize()) {
+            throw new IllegalArgumentException("corePoolSize exceeds 
maximumPoolSize");
+        }
+
+        synchronized (workers) {
+            if (super.getCorePoolSize() > corePoolSize) {
+                for (int i = super.getCorePoolSize() - corePoolSize; i > 0; 
i--) {
+                    removeWorker();
+                }
+            }
+            super.setCorePoolSize(corePoolSize);
+        }
     }
 
     private class Worker implements Runnable {
 
-       private AtomicLong completedTaskCount = new AtomicLong(0);
-
-       private Thread thread;
-
-       /**
-        * @inheritedDoc
-        */
-       @Override
-       public void run() {
-           thread = Thread.currentThread();
-
-           try {
-               for (;;) {
-                   IoSession session = fetchSession();
-
-                   idleWorkers.decrementAndGet();
-
-                   if (session == null) {
-                       synchronized (workers) {
-                           if (workers.size() > getCorePoolSize()) {
-                               // Remove now to prevent duplicate exit.
-                               workers.remove(this);
-                               break;
-                           }
-                       }
-                   }
-
-                   if (session == EXIT_SIGNAL) {
-                       break;
-                   }
-
-                   try {
-                       if (session != null) {
-                           runTasks(getSessionTasksQueue(session));
-                       }
-                   } finally {
-                       idleWorkers.incrementAndGet();
-                   }
-               }
-           } finally {
-               synchronized (workers) {
-                   workers.remove(this);
-                   PriorityThreadPoolExecutor.this.completedTaskCount += 
completedTaskCount.get();
-                   workers.notifyAll();
-               }
-           }
-       }
-
-       private IoSession fetchSession() {
-           SessionEntry entry = null;
-           long currentTime = System.currentTimeMillis();
-           long deadline = currentTime + 
getKeepAliveTime(TimeUnit.MILLISECONDS);
-
-           for (;;) {
-               try {
-                   long waitTime = deadline - currentTime;
-
-                   if (waitTime <= 0) {
-                       break;
-                   }
-
-                   try {
-                       entry = waitingSessions.poll(waitTime, 
TimeUnit.MILLISECONDS);
-                       break;
-                   } finally {
-                       if (entry != null) {
-                           currentTime = System.currentTimeMillis();
-                       }
-                   }
-               } catch (InterruptedException e) {
-                   // Ignore.
-                   continue;
-               }
-           }
-
-           if (entry != null) {
-               return entry.getSession();
-           }
-           return null;
-       }
-
-       private void runTasks(SessionQueue sessionTasksQueue) {
-           for (;;) {
-               Runnable task;
-               Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
-
-               synchronized (tasksQueue) {
-                   task = tasksQueue.poll();
-
-                   if (task == null) {
-                       sessionTasksQueue.processingCompleted = true;
-                       break;
-                   }
-               }
-
-               eventQueueHandler.polled(PriorityThreadPoolExecutor.this, 
(IoEvent) task);
-
-               runTask(task);
-           }
-       }
-
-       private void runTask(Runnable task) {
-           beforeExecute(thread, task);
-           boolean ran = false;
-           try {
-               task.run();
-               ran = true;
-               afterExecute(task, null);
-               completedTaskCount.incrementAndGet();
-           } catch (RuntimeException e) {
-               if (!ran) {
-                   afterExecute(task, e);
-               }
-               throw e;
-           }
-       }
+        private AtomicLong completedTaskCount = new AtomicLong(0);
+
+        private Thread thread;
+
+        /**
+         * @inheritedDoc
+         */
+        @Override
+        public void run() {
+            thread = Thread.currentThread();
+
+            try {
+                for (;;) {
+                    IoSession session = fetchSession();
+
+                    idleWorkers.decrementAndGet();
+
+                    if (session == null) {
+                        synchronized (workers) {
+                            if (workers.size() > getCorePoolSize()) {
+                                // Remove now to prevent duplicate exit.
+                                workers.remove(this);
+                                break;
+                            }
+                        }
+                    }
+
+                    if (session == EXIT_SIGNAL) {
+                        break;
+                    }
+
+                    try {
+                        if (session != null) {
+                            runTasks(getSessionTasksQueue(session));
+                        }
+                    } finally {
+                        idleWorkers.incrementAndGet();
+                    }
+                }
+            } finally {
+                synchronized (workers) {
+                    workers.remove(this);
+                    PriorityThreadPoolExecutor.this.completedTaskCount += 
completedTaskCount.get();
+                    workers.notifyAll();
+                }
+            }
+        }
+
+        private IoSession fetchSession() {
+            SessionEntry entry = null;
+            long currentTime = System.currentTimeMillis();
+            long deadline = currentTime + 
getKeepAliveTime(TimeUnit.MILLISECONDS);
+
+            for (;;) {
+                try {
+                    long waitTime = deadline - currentTime;
+
+                    if (waitTime <= 0) {
+                        break;
+                    }
+
+                    try {
+                        entry = waitingSessions.poll(waitTime, 
TimeUnit.MILLISECONDS);
+                        break;
+                    } finally {
+                        if (entry != null) {
+                            currentTime = System.currentTimeMillis();
+                        }
+                    }
+                } catch (InterruptedException e) {
+                    // Ignore.
+                    continue;
+                }
+            }
+
+            if (entry != null) {
+                return entry.getSession();
+            }
+            return null;
+        }
+
+        private void runTasks(SessionQueue sessionTasksQueue) {
+            for (;;) {
+                Runnable task;
+                Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
+
+                synchronized (tasksQueue) {
+                    task = tasksQueue.poll();
+
+                    if (task == null) {
+                        sessionTasksQueue.processingCompleted = true;
+                        break;
+                    }
+                }
+
+                eventQueueHandler.polled(PriorityThreadPoolExecutor.this, 
(IoEvent) task);
+
+                runTask(task);
+            }
+        }
+
+        private void runTask(Runnable task) {
+            beforeExecute(thread, task);
+            boolean ran = false;
+            try {
+                task.run();
+                ran = true;
+                afterExecute(task, null);
+                completedTaskCount.incrementAndGet();
+            } catch (RuntimeException e) {
+                if (!ran) {
+                    afterExecute(task, e);
+                }
+                throw e;
+            }
+        }
     }
 
     /**
@@ -821,65 +828,65 @@ public class PriorityThreadPoolExecutor extends 
ThreadPoolExecutor {
      * session, and the current task state.
      */
     private class SessionQueue {
-       /** A queue of ordered event waiting to be processed */
-       private final Queue<Runnable> tasksQueue = new 
ConcurrentLinkedQueue<>();
+        /** A queue of ordered event waiting to be processed */
+        private final Queue<Runnable> tasksQueue = new 
ConcurrentLinkedQueue<>();
 
-       /** The current task state */
-       private boolean processingCompleted = true;
+        /** The current task state */
+        private boolean processingCompleted = true;
     }
 
     /**
-     * A class used to preserve first-in-first-out order of sessions that have
-     * equal priority.
+     * A class used to preserve first-in-first-out order of sessions that have 
equal
+     * priority.
      */
     static class SessionEntry implements Comparable<SessionEntry> {
-       private final long seqNum;
-       private final IoSession session;
-       private final Comparator<IoSession> comparator;
-
-       public SessionEntry(IoSession session, Comparator<IoSession> 
comparator) {
-           if (session == null) {
-               throw new IllegalArgumentException("session");
-           }
-           seqNum = seq.getAndIncrement();
-           this.session = session;
-           this.comparator = comparator;
-       }
-
-       public IoSession getSession() {
-           return session;
-       }
-
-       public int compareTo(SessionEntry other) {
-           if (other == this) {
-               return 0;
-           }
-
-           if (other.session == this.session) {
-               return 0;
-           }
-
-           // An exit signal should always be preferred.
-           if (this == EXIT_SIGNAL) {
-               return -1;
-           }
-           if (other == EXIT_SIGNAL) {
-               return 1;
-           }
-
-           int res = 0;
-
-           // If there's a comparator, use it to prioritise events.
-           if (comparator != null) {
-               res = comparator.compare(session, other.session);
-           }
-
-           // FIFO tiebreaker.
-           if (res == 0) {
-               res = (seqNum < other.seqNum ? -1 : 1);
-           }
-
-           return res;
-       }
+        private final long seqNum;
+        private final IoSession session;
+        private final Comparator<IoSession> comparator;
+
+        public SessionEntry(IoSession session, Comparator<IoSession> 
comparator) {
+            if (session == null) {
+                throw new IllegalArgumentException("session");
+            }
+            seqNum = seq.getAndIncrement();
+            this.session = session;
+            this.comparator = comparator;
+        }
+
+        public IoSession getSession() {
+            return session;
+        }
+
+        public int compareTo(SessionEntry other) {
+            if (other == this) {
+                return 0;
+            }
+
+            if (other.session == this.session) {
+                return 0;
+            }
+
+            // An exit signal should always be preferred.
+            if (this == EXIT_SIGNAL) {
+                return -1;
+            }
+            if (other == EXIT_SIGNAL) {
+                return 1;
+            }
+
+            int res = 0;
+
+            // If there's a comparator, use it to prioritise events.
+            if (comparator != null) {
+                res = comparator.compare(session, other.session);
+            }
+
+            // FIFO tiebreaker.
+            if (res == 0) {
+                res = (seqNum < other.seqNum ? -1 : 1);
+            }
+
+            return res;
+        }
     }
 }

Reply via email to