Repository: mina Updated Branches: refs/heads/2.1.0 c95171c88 -> f858147e7
Removed a spurious printStackTrace (DIRMINA-1092) Project: http://git-wip-us.apache.org/repos/asf/mina/repo Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/bd7ee98f Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/bd7ee98f Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/bd7ee98f Branch: refs/heads/2.1.0 Commit: bd7ee98f1ec49f462ff140f3662910395e934281 Parents: 56ca189 Author: Emmanuel Lécharny <[email protected]> Authored: Mon Jul 30 18:21:49 2018 +0200 Committer: Emmanuel Lécharny <[email protected]> Committed: Mon Aug 27 17:50:34 2018 +0200 ---------------------------------------------------------------------- .../polling/AbstractPollingIoProcessor.java | 2 - .../executor/PriorityThreadPoolExecutor.java | 917 ++++++++++--------- 2 files changed, 462 insertions(+), 457 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina/blob/bd7ee98f/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java ---------------------------------------------------------------------- diff --git a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java index 2ee0b96..78807ee 100644 --- a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java +++ b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java @@ -1106,8 +1106,6 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im try { localWrittenBytes = write(session, buf, length); } catch (IOException ioe) { - ioe.printStackTrace(); - // We have had an issue while trying to send data to the // peer : let's close the session. buf.free(); http://git-wip-us.apache.org/repos/asf/mina/blob/bd7ee98f/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java b/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java index dd6b6e1..b0ace13 100644 --- a/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java +++ b/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java @@ -65,8 +65,7 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { private static final SessionEntry EXIT_SIGNAL = new SessionEntry(new DummySession(), null); /** - * A key stored into the session's attribute for the event tasks being - * queued + * A key stored into the session's attribute for the event tasks being queued */ private static final AttributeKey TASKS_QUEUE = new AttributeKey(PriorityThreadPoolExecutor.class, "tasksQueue"); @@ -88,45 +87,49 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { private final Comparator<IoSession> comparator; /** - * Creates a default ThreadPool, with default values : - minimum pool size - * is 0 - maximum pool size is 16 - keepAlive set to 30 seconds - A default + * Creates a default ThreadPool, with default values : - minimum pool size is 0 + * - maximum pool size is 16 - keepAlive set to 30 seconds - A default * ThreadFactory - All events are accepted */ public PriorityThreadPoolExecutor() { - this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null, null); + this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, + Executors.defaultThreadFactory(), null, null); } /** - * Creates a default ThreadPool, with default values : - minimum pool size - * is 0 - maximum pool size is 16 - keepAlive set to 30 seconds - A default + * Creates a default ThreadPool, with default values : - minimum pool size is 0 + * - maximum pool size is 16 - keepAlive set to 30 seconds - A default * ThreadFactory - All events are accepted */ public PriorityThreadPoolExecutor(Comparator<IoSession> comparator) { - this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null, comparator); + this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, + Executors.defaultThreadFactory(), null, comparator); } /** - * Creates a default ThreadPool, with default values : - minimum pool size - * is 0 - keepAlive set to 30 seconds - A default ThreadFactory - All events - * are accepted + * Creates a default ThreadPool, with default values : - minimum pool size is 0 + * - keepAlive set to 30 seconds - A default ThreadFactory - All events are + * accepted * * @param maximumPoolSize * The maximum pool size */ public PriorityThreadPoolExecutor(int maximumPoolSize) { - this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null, null); + this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, + Executors.defaultThreadFactory(), null, null); } /** - * Creates a default ThreadPool, with default values : - minimum pool size - * is 0 - keepAlive set to 30 seconds - A default ThreadFactory - All events - * are accepted + * Creates a default ThreadPool, with default values : - minimum pool size is 0 + * - keepAlive set to 30 seconds - A default ThreadFactory - All events are + * accepted * * @param maximumPoolSize * The maximum pool size */ public PriorityThreadPoolExecutor(int maximumPoolSize, Comparator<IoSession> comparator) { - this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null, comparator); + this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, + Executors.defaultThreadFactory(), null, comparator); } /** @@ -139,12 +142,13 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { * The maximum pool size */ public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize) { - this(corePoolSize, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null, null); + this(corePoolSize, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), + null, null); } /** - * Creates a default ThreadPool, with default values : - A default - * ThreadFactory - All events are accepted + * Creates a default ThreadPool, with default values : - A default ThreadFactory + * - All events are accepted * * @param corePoolSize * The initial pool sizePoolSize @@ -156,12 +160,11 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { * Time unit used for the keepAlive value */ public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), null, null); + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), null, null); } /** - * Creates a default ThreadPool, with default values : - A default - * ThreadFactory + * Creates a default ThreadPool, with default values : - A default ThreadFactory * * @param corePoolSize * The initial pool sizePoolSize @@ -174,13 +177,14 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { * @param eventQueueHandler * The queue used to store events */ - public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, IoEventQueueHandler eventQueueHandler) { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), eventQueueHandler, null); + public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + IoEventQueueHandler eventQueueHandler) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), eventQueueHandler, + null); } /** - * Creates a default ThreadPool, with default values : - A default - * ThreadFactory + * Creates a default ThreadPool, with default values : - A default ThreadFactory * * @param corePoolSize * The initial pool sizePoolSize @@ -193,8 +197,9 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { * @param threadFactory * The factory used to create threads */ - public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null, null); + public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + ThreadFactory threadFactory) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null, null); } /** @@ -213,66 +218,68 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { * @param eventQueueHandler * The queue used to store events */ - public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, IoEventQueueHandler eventQueueHandler, Comparator<IoSession> comparator) { - // We have to initialize the pool with default values (0 and 1) in order - // to - // handle the exception in a better way. We can't add a try {} catch() - // {} - // around the super() call. - super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory, new AbortPolicy()); + public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + ThreadFactory threadFactory, IoEventQueueHandler eventQueueHandler, Comparator<IoSession> comparator) { + // We have to initialize the pool with default values (0 and 1) in order + // to + // handle the exception in a better way. We can't add a try {} catch() + // {} + // around the super() call. + super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory, + new AbortPolicy()); - if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) { - throw new IllegalArgumentException("corePoolSize: " + corePoolSize); - } + if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) { + throw new IllegalArgumentException("corePoolSize: " + corePoolSize); + } - if ((maximumPoolSize == 0) || (maximumPoolSize < corePoolSize)) { - throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize); - } + if ((maximumPoolSize <= 0) || (maximumPoolSize < corePoolSize)) { + throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize); + } - // Now, we can setup the pool sizes - super.setCorePoolSize(corePoolSize); - super.setMaximumPoolSize(maximumPoolSize); + // Now, we can setup the pool sizes + super.setMaximumPoolSize(maximumPoolSize); + super.setCorePoolSize(corePoolSize); - // The queueHandler might be null. - if (eventQueueHandler == null) { - this.eventQueueHandler = IoEventQueueHandler.NOOP; - } else { - this.eventQueueHandler = eventQueueHandler; - } + // The queueHandler might be null. + if (eventQueueHandler == null) { + this.eventQueueHandler = IoEventQueueHandler.NOOP; + } else { + this.eventQueueHandler = eventQueueHandler; + } - // The comparator can be null. - this.comparator = comparator; + // The comparator can be null. + this.comparator = comparator; - if (this.comparator == null) { - this.waitingSessions = new LinkedBlockingQueue<>(); - } else { - this.waitingSessions = new PriorityBlockingQueue<>(); - } + if (this.comparator == null) { + this.waitingSessions = new LinkedBlockingQueue<>(); + } else { + this.waitingSessions = new PriorityBlockingQueue<>(); + } } /** * Get the session's tasks queue. */ private SessionQueue getSessionTasksQueue(IoSession session) { - SessionQueue queue = (SessionQueue) session.getAttribute(TASKS_QUEUE); + SessionQueue queue = (SessionQueue) session.getAttribute(TASKS_QUEUE); - if (queue == null) { - queue = new SessionQueue(); - SessionQueue oldQueue = (SessionQueue) session.setAttributeIfAbsent(TASKS_QUEUE, queue); + if (queue == null) { + queue = new SessionQueue(); + SessionQueue oldQueue = (SessionQueue) session.setAttributeIfAbsent(TASKS_QUEUE, queue); - if (oldQueue != null) { - queue = oldQueue; - } - } + if (oldQueue != null) { + queue = oldQueue; + } + } - return queue; + return queue; } /** * @return The associated queue handler. */ public IoEventQueueHandler getQueueHandler() { - return eventQueueHandler; + return eventQueueHandler; } /** @@ -280,56 +287,56 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { */ @Override public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { - // Ignore the request. It must always be AbortPolicy. + // Ignore the request. It must always be AbortPolicy. } /** - * Add a new thread to execute a task, if needed and possible. It depends on - * the current pool size. If it's full, we do nothing. + * Add a new thread to execute a task, if needed and possible. It depends on the + * current pool size. If it's full, we do nothing. */ private void addWorker() { - synchronized (workers) { - if (workers.size() >= super.getMaximumPoolSize()) { - return; - } + synchronized (workers) { + if (workers.size() >= super.getMaximumPoolSize()) { + return; + } - // Create a new worker, and add it to the thread pool - Worker worker = new Worker(); - Thread thread = getThreadFactory().newThread(worker); + // Create a new worker, and add it to the thread pool + Worker worker = new Worker(); + Thread thread = getThreadFactory().newThread(worker); - // As we have added a new thread, it's considered as idle. - idleWorkers.incrementAndGet(); + // As we have added a new thread, it's considered as idle. + idleWorkers.incrementAndGet(); - // Now, we can start it. - thread.start(); - workers.add(worker); + // Now, we can start it. + thread.start(); + workers.add(worker); - if (workers.size() > largestPoolSize) { - largestPoolSize = workers.size(); - } - } + if (workers.size() > largestPoolSize) { + largestPoolSize = workers.size(); + } + } } /** * Add a new Worker only if there are no idle worker. */ private void addWorkerIfNecessary() { - if (idleWorkers.get() == 0) { - synchronized (workers) { - if (workers.isEmpty() || (idleWorkers.get() == 0)) { - addWorker(); - } - } - } + if (idleWorkers.get() == 0) { + synchronized (workers) { + if (workers.isEmpty() || (idleWorkers.get() == 0)) { + addWorker(); + } + } + } } private void removeWorker() { - synchronized (workers) { - if (workers.size() <= super.getCorePoolSize()) { - return; - } - waitingSessions.offer(EXIT_SIGNAL); - } + synchronized (workers) { + if (workers.size() <= super.getCorePoolSize()) { + return; + } + waitingSessions.offer(EXIT_SIGNAL); + } } /** @@ -337,18 +344,18 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { */ @Override public void setMaximumPoolSize(int maximumPoolSize) { - if ((maximumPoolSize <= 0) || (maximumPoolSize < super.getCorePoolSize())) { - throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize); - } + if ((maximumPoolSize <= 0) || (maximumPoolSize < super.getCorePoolSize())) { + throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize); + } - synchronized (workers) { - super.setMaximumPoolSize(maximumPoolSize); - int difference = workers.size() - maximumPoolSize; - while (difference > 0) { - removeWorker(); - --difference; - } - } + synchronized (workers) { + super.setMaximumPoolSize(maximumPoolSize); + int difference = workers.size() - maximumPoolSize; + while (difference > 0) { + removeWorker(); + --difference; + } + } } /** @@ -357,19 +364,19 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { - long deadline = System.currentTimeMillis() + unit.toMillis(timeout); + long deadline = System.currentTimeMillis() + unit.toMillis(timeout); - synchronized (workers) { - while (!isTerminated()) { - long waitTime = deadline - System.currentTimeMillis(); - if (waitTime <= 0) { - break; - } + synchronized (workers) { + while (!isTerminated()) { + long waitTime = deadline - System.currentTimeMillis(); + if (waitTime <= 0) { + break; + } - workers.wait(waitTime); - } - } - return isTerminated(); + workers.wait(waitTime); + } + } + return isTerminated(); } /** @@ -377,7 +384,7 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { */ @Override public boolean isShutdown() { - return shutdown; + return shutdown; } /** @@ -385,13 +392,13 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { */ @Override public boolean isTerminated() { - if (!shutdown) { - return false; - } + if (!shutdown) { + return false; + } - synchronized (workers) { - return workers.isEmpty(); - } + synchronized (workers) { + return workers.isEmpty(); + } } /** @@ -399,17 +406,17 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { */ @Override public void shutdown() { - if (shutdown) { - return; - } + if (shutdown) { + return; + } - shutdown = true; + shutdown = true; - synchronized (workers) { - for (int i = workers.size(); i > 0; i--) { - waitingSessions.offer(EXIT_SIGNAL); - } - } + synchronized (workers) { + for (int i = workers.size(); i > 0; i--) { + waitingSessions.offer(EXIT_SIGNAL); + } + } } /** @@ -417,53 +424,53 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { */ @Override public List<Runnable> shutdownNow() { - shutdown(); + shutdown(); - List<Runnable> answer = new ArrayList<>(); - SessionEntry entry; + List<Runnable> answer = new ArrayList<>(); + SessionEntry entry; - while ((entry = waitingSessions.poll()) != null) { - if (entry == EXIT_SIGNAL) { - waitingSessions.offer(EXIT_SIGNAL); - Thread.yield(); // Let others take the signal. - continue; - } + while ((entry = waitingSessions.poll()) != null) { + if (entry == EXIT_SIGNAL) { + waitingSessions.offer(EXIT_SIGNAL); + Thread.yield(); // Let others take the signal. + continue; + } - SessionQueue sessionTasksQueue = (SessionQueue) entry.getSession().getAttribute(TASKS_QUEUE); + SessionQueue sessionTasksQueue = (SessionQueue) entry.getSession().getAttribute(TASKS_QUEUE); - synchronized (sessionTasksQueue.tasksQueue) { + synchronized (sessionTasksQueue.tasksQueue) { - for (Runnable task : sessionTasksQueue.tasksQueue) { - getQueueHandler().polled(this, (IoEvent) task); - answer.add(task); - } + for (Runnable task : sessionTasksQueue.tasksQueue) { + getQueueHandler().polled(this, (IoEvent) task); + answer.add(task); + } - sessionTasksQueue.tasksQueue.clear(); - } - } + sessionTasksQueue.tasksQueue.clear(); + } + } - return answer; + return answer; } /** * A Helper class used to print the list of events being queued. */ private void print(Queue<Runnable> queue, IoEvent event) { - StringBuilder sb = new StringBuilder(); - sb.append("Adding event ").append(event.getType()).append(" to session ").append(event.getSession().getId()); - boolean first = true; - sb.append("\nQueue : ["); - for (Runnable elem : queue) { - if (first) { - first = false; - } else { - sb.append(", "); - } + StringBuilder sb = new StringBuilder(); + sb.append("Adding event ").append(event.getType()).append(" to session ").append(event.getSession().getId()); + boolean first = true; + sb.append("\nQueue : ["); + for (Runnable elem : queue) { + if (first) { + first = false; + } else { + sb.append(", "); + } - sb.append(((IoEvent) elem).getType()).append(", "); - } - sb.append("]\n"); - LOGGER.debug(sb.toString()); + sb.append(((IoEvent) elem).getType()).append(", "); + } + sb.append("]\n"); + LOGGER.debug(sb.toString()); } /** @@ -471,72 +478,72 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { */ @Override public void execute(Runnable task) { - if (shutdown) { - rejectTask(task); - } + if (shutdown) { + rejectTask(task); + } - // Check that it's a IoEvent task - checkTaskType(task); + // Check that it's a IoEvent task + checkTaskType(task); - IoEvent event = (IoEvent) task; + IoEvent event = (IoEvent) task; - // Get the associated session - IoSession session = event.getSession(); + // Get the associated session + IoSession session = event.getSession(); - // Get the session's queue of events - SessionQueue sessionTasksQueue = getSessionTasksQueue(session); - Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue; + // Get the session's queue of events + SessionQueue sessionTasksQueue = getSessionTasksQueue(session); + Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue; - boolean offerSession; + boolean offerSession; - // propose the new event to the event queue handler. If we - // use a throttle queue handler, the message may be rejected - // if the maximum size has been reached. - boolean offerEvent = eventQueueHandler.accept(this, event); + // propose the new event to the event queue handler. If we + // use a throttle queue handler, the message may be rejected + // if the maximum size has been reached. + boolean offerEvent = eventQueueHandler.accept(this, event); - if (offerEvent) { - // Ok, the message has been accepted - synchronized (tasksQueue) { - // Inject the event into the executor taskQueue - tasksQueue.offer(event); + if (offerEvent) { + // Ok, the message has been accepted + synchronized (tasksQueue) { + // Inject the event into the executor taskQueue + tasksQueue.offer(event); - if (sessionTasksQueue.processingCompleted) { - sessionTasksQueue.processingCompleted = false; - offerSession = true; - } else { - offerSession = false; - } + if (sessionTasksQueue.processingCompleted) { + sessionTasksQueue.processingCompleted = false; + offerSession = true; + } else { + offerSession = false; + } - if (LOGGER.isDebugEnabled()) { - print(tasksQueue, event); - } - } - } else { - offerSession = false; - } + if (LOGGER.isDebugEnabled()) { + print(tasksQueue, event); + } + } + } else { + offerSession = false; + } - if (offerSession) { - // As the tasksQueue was empty, the task has been executed - // immediately, so we can move the session to the queue - // of sessions waiting for completion. - waitingSessions.offer(new SessionEntry(session, comparator)); - } + if (offerSession) { + // As the tasksQueue was empty, the task has been executed + // immediately, so we can move the session to the queue + // of sessions waiting for completion. + waitingSessions.offer(new SessionEntry(session, comparator)); + } - addWorkerIfNecessary(); + addWorkerIfNecessary(); - if (offerEvent) { - eventQueueHandler.offered(this, event); - } + if (offerEvent) { + eventQueueHandler.offered(this, event); + } } private void rejectTask(Runnable task) { - getRejectedExecutionHandler().rejectedExecution(task, this); + getRejectedExecutionHandler().rejectedExecution(task, this); } private void checkTaskType(Runnable task) { - if (!(task instanceof IoEvent)) { - throw new IllegalArgumentException("task must be an IoEvent or its subclass."); - } + if (!(task instanceof IoEvent)) { + throw new IllegalArgumentException("task must be an IoEvent or its subclass."); + } } /** @@ -544,9 +551,9 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { */ @Override public int getActiveCount() { - synchronized (workers) { - return workers.size() - idleWorkers.get(); - } + synchronized (workers) { + return workers.size() - idleWorkers.get(); + } } /** @@ -554,14 +561,14 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { */ @Override public long getCompletedTaskCount() { - synchronized (workers) { - long answer = completedTaskCount; - for (Worker w : workers) { - answer += w.completedTaskCount.get(); - } + synchronized (workers) { + long answer = completedTaskCount; + for (Worker w : workers) { + answer += w.completedTaskCount.get(); + } - return answer; - } + return answer; + } } /** @@ -569,7 +576,7 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { */ @Override public int getLargestPoolSize() { - return largestPoolSize; + return largestPoolSize; } /** @@ -577,9 +584,9 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { */ @Override public int getPoolSize() { - synchronized (workers) { - return workers.size(); - } + synchronized (workers) { + return workers.size(); + } } /** @@ -587,7 +594,7 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { */ @Override public long getTaskCount() { - return getCompletedTaskCount(); + return getCompletedTaskCount(); } /** @@ -595,9 +602,9 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { */ @Override public boolean isTerminating() { - synchronized (workers) { - return isShutdown() && !isTerminated(); - } + synchronized (workers) { + return isShutdown() && !isTerminated(); + } } /** @@ -605,14 +612,14 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { */ @Override public int prestartAllCoreThreads() { - int answer = 0; - synchronized (workers) { - for (int i = super.getCorePoolSize() - workers.size(); i > 0; i--) { - addWorker(); - answer++; - } - } - return answer; + int answer = 0; + synchronized (workers) { + for (int i = super.getCorePoolSize() - workers.size(); i > 0; i--) { + addWorker(); + answer++; + } + } + return answer; } /** @@ -620,14 +627,14 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { */ @Override public boolean prestartCoreThread() { - synchronized (workers) { - if (workers.size() < super.getCorePoolSize()) { - addWorker(); - return true; - } else { - return false; - } - } + synchronized (workers) { + if (workers.size() < super.getCorePoolSize()) { + addWorker(); + return true; + } else { + return false; + } + } } /** @@ -635,7 +642,7 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { */ @Override public BlockingQueue<Runnable> getQueue() { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException(); } /** @@ -643,7 +650,7 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { */ @Override public void purge() { - // Nothing to purge in this implementation. + // Nothing to purge in this implementation. } /** @@ -651,27 +658,27 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { */ @Override public boolean remove(Runnable task) { - checkTaskType(task); - IoEvent event = (IoEvent) task; - IoSession session = event.getSession(); - SessionQueue sessionTasksQueue = (SessionQueue) session.getAttribute(TASKS_QUEUE); + checkTaskType(task); + IoEvent event = (IoEvent) task; + IoSession session = event.getSession(); + SessionQueue sessionTasksQueue = (SessionQueue) session.getAttribute(TASKS_QUEUE); - if (sessionTasksQueue == null) { - return false; - } + if (sessionTasksQueue == null) { + return false; + } - boolean removed; - Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue; + boolean removed; + Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue; - synchronized (tasksQueue) { - removed = tasksQueue.remove(task); - } + synchronized (tasksQueue) { + removed = tasksQueue.remove(task); + } - if (removed) { - getQueueHandler().polled(this, event); - } + if (removed) { + getQueueHandler().polled(this, event); + } - return removed; + return removed; } /** @@ -679,141 +686,141 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { */ @Override public void setCorePoolSize(int corePoolSize) { - if (corePoolSize < 0) { - throw new IllegalArgumentException("corePoolSize: " + corePoolSize); - } - if (corePoolSize > super.getMaximumPoolSize()) { - throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize"); - } - - synchronized (workers) { - if (super.getCorePoolSize() > corePoolSize) { - for (int i = super.getCorePoolSize() - corePoolSize; i > 0; i--) { - removeWorker(); - } - } - super.setCorePoolSize(corePoolSize); - } + if (corePoolSize < 0) { + throw new IllegalArgumentException("corePoolSize: " + corePoolSize); + } + if (corePoolSize > super.getMaximumPoolSize()) { + throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize"); + } + + synchronized (workers) { + if (super.getCorePoolSize() > corePoolSize) { + for (int i = super.getCorePoolSize() - corePoolSize; i > 0; i--) { + removeWorker(); + } + } + super.setCorePoolSize(corePoolSize); + } } private class Worker implements Runnable { - private AtomicLong completedTaskCount = new AtomicLong(0); - - private Thread thread; - - /** - * @inheritedDoc - */ - @Override - public void run() { - thread = Thread.currentThread(); - - try { - for (;;) { - IoSession session = fetchSession(); - - idleWorkers.decrementAndGet(); - - if (session == null) { - synchronized (workers) { - if (workers.size() > getCorePoolSize()) { - // Remove now to prevent duplicate exit. - workers.remove(this); - break; - } - } - } - - if (session == EXIT_SIGNAL) { - break; - } - - try { - if (session != null) { - runTasks(getSessionTasksQueue(session)); - } - } finally { - idleWorkers.incrementAndGet(); - } - } - } finally { - synchronized (workers) { - workers.remove(this); - PriorityThreadPoolExecutor.this.completedTaskCount += completedTaskCount.get(); - workers.notifyAll(); - } - } - } - - private IoSession fetchSession() { - SessionEntry entry = null; - long currentTime = System.currentTimeMillis(); - long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS); - - for (;;) { - try { - long waitTime = deadline - currentTime; - - if (waitTime <= 0) { - break; - } - - try { - entry = waitingSessions.poll(waitTime, TimeUnit.MILLISECONDS); - break; - } finally { - if (entry != null) { - currentTime = System.currentTimeMillis(); - } - } - } catch (InterruptedException e) { - // Ignore. - continue; - } - } - - if (entry != null) { - return entry.getSession(); - } - return null; - } - - private void runTasks(SessionQueue sessionTasksQueue) { - for (;;) { - Runnable task; - Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue; - - synchronized (tasksQueue) { - task = tasksQueue.poll(); - - if (task == null) { - sessionTasksQueue.processingCompleted = true; - break; - } - } - - eventQueueHandler.polled(PriorityThreadPoolExecutor.this, (IoEvent) task); - - runTask(task); - } - } - - private void runTask(Runnable task) { - beforeExecute(thread, task); - boolean ran = false; - try { - task.run(); - ran = true; - afterExecute(task, null); - completedTaskCount.incrementAndGet(); - } catch (RuntimeException e) { - if (!ran) { - afterExecute(task, e); - } - throw e; - } - } + private AtomicLong completedTaskCount = new AtomicLong(0); + + private Thread thread; + + /** + * @inheritedDoc + */ + @Override + public void run() { + thread = Thread.currentThread(); + + try { + for (;;) { + IoSession session = fetchSession(); + + idleWorkers.decrementAndGet(); + + if (session == null) { + synchronized (workers) { + if (workers.size() > getCorePoolSize()) { + // Remove now to prevent duplicate exit. + workers.remove(this); + break; + } + } + } + + if (session == EXIT_SIGNAL) { + break; + } + + try { + if (session != null) { + runTasks(getSessionTasksQueue(session)); + } + } finally { + idleWorkers.incrementAndGet(); + } + } + } finally { + synchronized (workers) { + workers.remove(this); + PriorityThreadPoolExecutor.this.completedTaskCount += completedTaskCount.get(); + workers.notifyAll(); + } + } + } + + private IoSession fetchSession() { + SessionEntry entry = null; + long currentTime = System.currentTimeMillis(); + long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS); + + for (;;) { + try { + long waitTime = deadline - currentTime; + + if (waitTime <= 0) { + break; + } + + try { + entry = waitingSessions.poll(waitTime, TimeUnit.MILLISECONDS); + break; + } finally { + if (entry != null) { + currentTime = System.currentTimeMillis(); + } + } + } catch (InterruptedException e) { + // Ignore. + continue; + } + } + + if (entry != null) { + return entry.getSession(); + } + return null; + } + + private void runTasks(SessionQueue sessionTasksQueue) { + for (;;) { + Runnable task; + Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue; + + synchronized (tasksQueue) { + task = tasksQueue.poll(); + + if (task == null) { + sessionTasksQueue.processingCompleted = true; + break; + } + } + + eventQueueHandler.polled(PriorityThreadPoolExecutor.this, (IoEvent) task); + + runTask(task); + } + } + + private void runTask(Runnable task) { + beforeExecute(thread, task); + boolean ran = false; + try { + task.run(); + ran = true; + afterExecute(task, null); + completedTaskCount.incrementAndGet(); + } catch (RuntimeException e) { + if (!ran) { + afterExecute(task, e); + } + throw e; + } + } } /** @@ -821,65 +828,65 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { * session, and the current task state. */ private class SessionQueue { - /** A queue of ordered event waiting to be processed */ - private final Queue<Runnable> tasksQueue = new ConcurrentLinkedQueue<>(); + /** A queue of ordered event waiting to be processed */ + private final Queue<Runnable> tasksQueue = new ConcurrentLinkedQueue<>(); - /** The current task state */ - private boolean processingCompleted = true; + /** The current task state */ + private boolean processingCompleted = true; } /** - * A class used to preserve first-in-first-out order of sessions that have - * equal priority. + * A class used to preserve first-in-first-out order of sessions that have equal + * priority. */ static class SessionEntry implements Comparable<SessionEntry> { - private final long seqNum; - private final IoSession session; - private final Comparator<IoSession> comparator; - - public SessionEntry(IoSession session, Comparator<IoSession> comparator) { - if (session == null) { - throw new IllegalArgumentException("session"); - } - seqNum = seq.getAndIncrement(); - this.session = session; - this.comparator = comparator; - } - - public IoSession getSession() { - return session; - } - - public int compareTo(SessionEntry other) { - if (other == this) { - return 0; - } - - if (other.session == this.session) { - return 0; - } - - // An exit signal should always be preferred. - if (this == EXIT_SIGNAL) { - return -1; - } - if (other == EXIT_SIGNAL) { - return 1; - } - - int res = 0; - - // If there's a comparator, use it to prioritise events. - if (comparator != null) { - res = comparator.compare(session, other.session); - } - - // FIFO tiebreaker. - if (res == 0) { - res = (seqNum < other.seqNum ? -1 : 1); - } - - return res; - } + private final long seqNum; + private final IoSession session; + private final Comparator<IoSession> comparator; + + public SessionEntry(IoSession session, Comparator<IoSession> comparator) { + if (session == null) { + throw new IllegalArgumentException("session"); + } + seqNum = seq.getAndIncrement(); + this.session = session; + this.comparator = comparator; + } + + public IoSession getSession() { + return session; + } + + public int compareTo(SessionEntry other) { + if (other == this) { + return 0; + } + + if (other.session == this.session) { + return 0; + } + + // An exit signal should always be preferred. + if (this == EXIT_SIGNAL) { + return -1; + } + if (other == EXIT_SIGNAL) { + return 1; + } + + int res = 0; + + // If there's a comparator, use it to prioritise events. + if (comparator != null) { + res = comparator.compare(session, other.session); + } + + // FIFO tiebreaker. + if (res == 0) { + res = (seqNum < other.seqNum ? -1 : 1); + } + + return res; + } } }
