This is an automated email from the ASF dual-hosted git repository. elecharny pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/mina.git
commit be9e2ba673d88cdf0a54e08013672a996726a7bc Author: emmanuel lecharny <[email protected]> AuthorDate: Tue Apr 9 10:16:27 2019 +0200 Backported the DIRMINA-1078 patch --- .../executor/PriorityThreadPoolExecutor.java | 897 +++++++++++++++++++++ 1 file changed, 897 insertions(+) diff --git a/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java b/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java new file mode 100644 index 0000000..721005c --- /dev/null +++ b/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java @@ -0,0 +1,897 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.filter.executor; + +import org.apache.mina.core.session.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A {@link ThreadPoolExecutor} that maintains the order of {@link IoEvent}s + * within a session (similar to {@link OrderedThreadPoolExecutor}) and allows + * some sessions to be prioritized over other sessions. + * <p> + * If you don't need to maintain the order of events per session, please use + * {@link UnorderedThreadPoolExecutor}. + * <p> + * If you don't need to prioritize sessions, please use + * {@link OrderedThreadPoolExecutor}. + * + * @author <a href="http://mina.apache.org">Apache MINA Project</a> + * @author Guus der Kinderen, [email protected] + * @org.apache.xbean.XBean + */ +// TODO this class currently copies OrderedThreadPoolExecutor, and changes the +// BlockingQueue used for the waitingSessions field. This code duplication +// should be avoided. +public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { + /** A logger for this class (commented as it breaks MDCFlter tests) */ + private static final Logger LOGGER = LoggerFactory.getLogger(PriorityThreadPoolExecutor.class); + + /** Generates sequential identifiers that ensure FIFO behavior. */ + private static final AtomicLong seq = new AtomicLong(0); + + /** A default value for the initial pool size */ + private static final int DEFAULT_INITIAL_THREAD_POOL_SIZE = 0; + + /** A default value for the maximum pool size */ + private static final int DEFAULT_MAX_THREAD_POOL = 16; + + /** A default value for the KeepAlive delay */ + private static final int DEFAULT_KEEP_ALIVE = 30; + + private static final SessionEntry EXIT_SIGNAL = new SessionEntry(new DummySession(), null); + + /** + * A key stored into the session's attribute for the event tasks being queued + */ + private static final AttributeKey TASKS_QUEUE = new AttributeKey(PriorityThreadPoolExecutor.class, "tasksQueue"); + + /** A queue used to store the available sessions */ + private final BlockingQueue<SessionEntry> waitingSessions; + + private final Set<Worker> workers = new HashSet<>(); + + private volatile int largestPoolSize; + + private final AtomicInteger idleWorkers = new AtomicInteger(); + + private long completedTaskCount; + + private volatile boolean shutdown; + + private final IoEventQueueHandler eventQueueHandler; + + private final Comparator<IoSession> comparator; + + /** + * Creates a default ThreadPool, with default values : - minimum pool size is 0 + * - maximum pool size is 16 - keepAlive set to 30 seconds - A default + * ThreadFactory - All events are accepted + */ + public PriorityThreadPoolExecutor() { + this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, + Executors.defaultThreadFactory(), null, null); + } + + /** + * Creates a default ThreadPool, with default values : - minimum pool size is 0 + * - maximum pool size is 16 - keepAlive set to 30 seconds - A default + * ThreadFactory - All events are accepted + */ + public PriorityThreadPoolExecutor(Comparator<IoSession> comparator) { + this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, + Executors.defaultThreadFactory(), null, comparator); + } + + /** + * Creates a default ThreadPool, with default values : - minimum pool size is 0 + * - keepAlive set to 30 seconds - A default ThreadFactory - All events are + * accepted + * + * @param maximumPoolSize + * The maximum pool size + */ + public PriorityThreadPoolExecutor(int maximumPoolSize) { + this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, + Executors.defaultThreadFactory(), null, null); + } + + /** + * Creates a default ThreadPool, with default values : - minimum pool size is 0 + * - keepAlive set to 30 seconds - A default ThreadFactory - All events are + * accepted + * + * @param maximumPoolSize + * The maximum pool size + */ + public PriorityThreadPoolExecutor(int maximumPoolSize, Comparator<IoSession> comparator) { + this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, + Executors.defaultThreadFactory(), null, comparator); + } + + /** + * Creates a default ThreadPool, with default values : - keepAlive set to 30 + * seconds - A default ThreadFactory - All events are accepted + * + * @param corePoolSize + * The initial pool sizePoolSize + * @param maximumPoolSize + * The maximum pool size + */ + public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize) { + this(corePoolSize, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), + null, null); + } + + /** + * Creates a default ThreadPool, with default values : - A default ThreadFactory + * - All events are accepted + * + * @param corePoolSize + * The initial pool sizePoolSize + * @param maximumPoolSize + * The maximum pool size + * @param keepAliveTime + * Default duration for a thread + * @param unit + * Time unit used for the keepAlive value + */ + public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), null, null); + } + + /** + * Creates a default ThreadPool, with default values : - A default ThreadFactory + * + * @param corePoolSize + * The initial pool sizePoolSize + * @param maximumPoolSize + * The maximum pool size + * @param keepAliveTime + * Default duration for a thread + * @param unit + * Time unit used for the keepAlive value + * @param eventQueueHandler + * The queue used to store events + */ + public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + IoEventQueueHandler eventQueueHandler) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), eventQueueHandler, + null); + } + + /** + * Creates a default ThreadPool, with default values : - A default ThreadFactory + * + * @param corePoolSize + * The initial pool sizePoolSize + * @param maximumPoolSize + * The maximum pool size + * @param keepAliveTime + * Default duration for a thread + * @param unit + * Time unit used for the keepAlive value + * @param threadFactory + * The factory used to create threads + */ + public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + ThreadFactory threadFactory) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null, null); + } + + /** + * Creates a new instance of a PrioritisedOrderedThreadPoolExecutor. + * + * @param corePoolSize + * The initial pool sizePoolSize + * @param maximumPoolSize + * The maximum pool size + * @param keepAliveTime + * Default duration for a thread + * @param unit + * Time unit used for the keepAlive value + * @param threadFactory + * The factory used to create threads + * @param eventQueueHandler + * The queue used to store events + */ + public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + ThreadFactory threadFactory, IoEventQueueHandler eventQueueHandler, Comparator<IoSession> comparator) { + // We have to initialize the pool with default values (0 and 1) in order + // to + // handle the exception in a better way. We can't add a try {} catch() + // {} + // around the super() call. + super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory, + new AbortPolicy()); + + if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) { + throw new IllegalArgumentException("corePoolSize: " + corePoolSize); + } + + if ((maximumPoolSize <= 0) || (maximumPoolSize < corePoolSize)) { + throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize); + } + + // Now, we can setup the pool sizes + super.setMaximumPoolSize(maximumPoolSize); + super.setCorePoolSize(corePoolSize); + + // The queueHandler might be null. + if (eventQueueHandler == null) { + this.eventQueueHandler = IoEventQueueHandler.NOOP; + } else { + this.eventQueueHandler = eventQueueHandler; + } + + // The comparator can be null. + this.comparator = comparator; + + if (this.comparator == null) { + this.waitingSessions = new LinkedBlockingQueue<>(); + } else { + this.waitingSessions = new PriorityBlockingQueue<>(); + } + } + + /** + * Get the session's tasks queue. + */ + private SessionQueue getSessionTasksQueue(IoSession session) { + SessionQueue queue = (SessionQueue) session.getAttribute(TASKS_QUEUE); + + if (queue == null) { + queue = new SessionQueue(); + SessionQueue oldQueue = (SessionQueue) session.setAttributeIfAbsent(TASKS_QUEUE, queue); + + if (oldQueue != null) { + queue = oldQueue; + } + } + + return queue; + } + + /** + * @return The associated queue handler. + */ + public IoEventQueueHandler getQueueHandler() { + return eventQueueHandler; + } + + /** + * {@inheritDoc} + */ + @Override + public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { + // Ignore the request. It must always be AbortPolicy. + } + + /** + * Add a new thread to execute a task, if needed and possible. It depends on the + * current pool size. If it's full, we do nothing. + */ + private void addWorker() { + synchronized (workers) { + if (workers.size() >= super.getMaximumPoolSize()) { + return; + } + + // Create a new worker, and add it to the thread pool + Worker worker = new Worker(); + Thread thread = getThreadFactory().newThread(worker); + + // As we have added a new thread, it's considered as idle. + idleWorkers.incrementAndGet(); + + // Now, we can start it. + thread.start(); + workers.add(worker); + + if (workers.size() > largestPoolSize) { + largestPoolSize = workers.size(); + } + } + } + + /** + * Add a new Worker only if there are no idle worker. + */ + private void addWorkerIfNecessary() { + if (idleWorkers.get() == 0) { + synchronized (workers) { + if (workers.isEmpty() || (idleWorkers.get() == 0)) { + addWorker(); + } + } + } + } + + private void removeWorker() { + synchronized (workers) { + if (workers.size() <= super.getCorePoolSize()) { + return; + } + waitingSessions.offer(EXIT_SIGNAL); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void setMaximumPoolSize(int maximumPoolSize) { + if ((maximumPoolSize <= 0) || (maximumPoolSize < super.getCorePoolSize())) { + throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize); + } + + synchronized (workers) { + super.setMaximumPoolSize(maximumPoolSize); + int difference = workers.size() - maximumPoolSize; + while (difference > 0) { + removeWorker(); + --difference; + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + + long deadline = System.currentTimeMillis() + unit.toMillis(timeout); + + synchronized (workers) { + while (!isTerminated()) { + long waitTime = deadline - System.currentTimeMillis(); + if (waitTime <= 0) { + break; + } + + workers.wait(waitTime); + } + } + return isTerminated(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isShutdown() { + return shutdown; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isTerminated() { + if (!shutdown) { + return false; + } + + synchronized (workers) { + return workers.isEmpty(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void shutdown() { + if (shutdown) { + return; + } + + shutdown = true; + + synchronized (workers) { + for (int i = workers.size(); i > 0; i--) { + waitingSessions.offer(EXIT_SIGNAL); + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public List<Runnable> shutdownNow() { + shutdown(); + + List<Runnable> answer = new ArrayList<>(); + SessionEntry entry; + + while ((entry = waitingSessions.poll()) != null) { + if (entry == EXIT_SIGNAL) { + waitingSessions.offer(EXIT_SIGNAL); + Thread.yield(); // Let others take the signal. + continue; + } + + SessionQueue sessionTasksQueue = (SessionQueue) entry.getSession().getAttribute(TASKS_QUEUE); + + synchronized (sessionTasksQueue.tasksQueue) { + + for (Runnable task : sessionTasksQueue.tasksQueue) { + getQueueHandler().polled(this, (IoEvent) task); + answer.add(task); + } + + sessionTasksQueue.tasksQueue.clear(); + } + } + + return answer; + } + + /** + * A Helper class used to print the list of events being queued. + */ + private void print(Queue<Runnable> queue, IoEvent event) { + StringBuilder sb = new StringBuilder(); + sb.append("Adding event ").append(event.getType()).append(" to session ").append(event.getSession().getId()); + boolean first = true; + sb.append("\nQueue : ["); + + for (Runnable elem : queue) { + if (first) { + first = false; + } else { + sb.append(", "); + } + + sb.append(((IoEvent) elem).getType()).append(", "); + } + + sb.append("]\n"); + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(sb.toString()); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void execute(Runnable task) { + if (shutdown) { + rejectTask(task); + } + + // Check that it's a IoEvent task + checkTaskType(task); + + IoEvent event = (IoEvent) task; + + // Get the associated session + IoSession session = event.getSession(); + + // Get the session's queue of events + SessionQueue sessionTasksQueue = getSessionTasksQueue(session); + Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue; + + boolean offerSession; + + // propose the new event to the event queue handler. If we + // use a throttle queue handler, the message may be rejected + // if the maximum size has been reached. + boolean offerEvent = eventQueueHandler.accept(this, event); + + if (offerEvent) { + // Ok, the message has been accepted + synchronized (tasksQueue) { + // Inject the event into the executor taskQueue + tasksQueue.offer(event); + + if (sessionTasksQueue.processingCompleted) { + sessionTasksQueue.processingCompleted = false; + offerSession = true; + } else { + offerSession = false; + } + + if (LOGGER.isDebugEnabled()) { + print(tasksQueue, event); + } + } + } else { + offerSession = false; + } + + if (offerSession) { + // As the tasksQueue was empty, the task has been executed + // immediately, so we can move the session to the queue + // of sessions waiting for completion. + waitingSessions.offer(new SessionEntry(session, comparator)); + } + + addWorkerIfNecessary(); + + if (offerEvent) { + eventQueueHandler.offered(this, event); + } + } + + private void rejectTask(Runnable task) { + getRejectedExecutionHandler().rejectedExecution(task, this); + } + + private void checkTaskType(Runnable task) { + if (!(task instanceof IoEvent)) { + throw new IllegalArgumentException("task must be an IoEvent or its subclass."); + } + } + + /** + * {@inheritDoc} + */ + @Override + public int getActiveCount() { + synchronized (workers) { + return workers.size() - idleWorkers.get(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public long getCompletedTaskCount() { + synchronized (workers) { + long answer = completedTaskCount; + for (Worker w : workers) { + answer += w.completedTaskCount.get(); + } + + return answer; + } + } + + /** + * {@inheritDoc} + */ + @Override + public int getLargestPoolSize() { + return largestPoolSize; + } + + /** + * {@inheritDoc} + */ + @Override + public int getPoolSize() { + synchronized (workers) { + return workers.size(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public long getTaskCount() { + return getCompletedTaskCount(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isTerminating() { + synchronized (workers) { + return isShutdown() && !isTerminated(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public int prestartAllCoreThreads() { + int answer = 0; + synchronized (workers) { + for (int i = super.getCorePoolSize() - workers.size(); i > 0; i--) { + addWorker(); + answer++; + } + } + return answer; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean prestartCoreThread() { + synchronized (workers) { + if (workers.size() < super.getCorePoolSize()) { + addWorker(); + return true; + } else { + return false; + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public BlockingQueue<Runnable> getQueue() { + throw new UnsupportedOperationException(); + } + + /** + * {@inheritDoc} + */ + @Override + public void purge() { + // Nothing to purge in this implementation. + } + + /** + * {@inheritDoc} + */ + @Override + public boolean remove(Runnable task) { + checkTaskType(task); + IoEvent event = (IoEvent) task; + IoSession session = event.getSession(); + SessionQueue sessionTasksQueue = (SessionQueue) session.getAttribute(TASKS_QUEUE); + + if (sessionTasksQueue == null) { + return false; + } + + boolean removed; + Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue; + + synchronized (tasksQueue) { + removed = tasksQueue.remove(task); + } + + if (removed) { + getQueueHandler().polled(this, event); + } + + return removed; + } + + /** + * {@inheritDoc} + */ + @Override + public void setCorePoolSize(int corePoolSize) { + if (corePoolSize < 0) { + throw new IllegalArgumentException("corePoolSize: " + corePoolSize); + } + if (corePoolSize > super.getMaximumPoolSize()) { + throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize"); + } + + synchronized (workers) { + if (super.getCorePoolSize() > corePoolSize) { + for (int i = super.getCorePoolSize() - corePoolSize; i > 0; i--) { + removeWorker(); + } + } + super.setCorePoolSize(corePoolSize); + } + } + + private class Worker implements Runnable { + + private AtomicLong completedTaskCount = new AtomicLong(0); + + private Thread thread; + + /** + * @inheritedDoc + */ + @Override + public void run() { + thread = Thread.currentThread(); + + try { + for (;;) { + IoSession session = fetchSession(); + + idleWorkers.decrementAndGet(); + + if (session == null) { + synchronized (workers) { + if (workers.size() > getCorePoolSize()) { + // Remove now to prevent duplicate exit. + workers.remove(this); + break; + } + } + } + + if (session == EXIT_SIGNAL) { + break; + } + + try { + if (session != null) { + runTasks(getSessionTasksQueue(session)); + } + } finally { + idleWorkers.incrementAndGet(); + } + } + } finally { + synchronized (workers) { + workers.remove(this); + PriorityThreadPoolExecutor.this.completedTaskCount += completedTaskCount.get(); + workers.notifyAll(); + } + } + } + + private IoSession fetchSession() { + SessionEntry entry = null; + long currentTime = System.currentTimeMillis(); + long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS); + + for (;;) { + try { + long waitTime = deadline - currentTime; + + if (waitTime <= 0) { + break; + } + + try { + entry = waitingSessions.poll(waitTime, TimeUnit.MILLISECONDS); + break; + } finally { + if (entry != null) { + currentTime = System.currentTimeMillis(); + } + } + } catch (InterruptedException e) { + // Ignore. + continue; + } + } + + if (entry != null) { + return entry.getSession(); + } + return null; + } + + private void runTasks(SessionQueue sessionTasksQueue) { + for (;;) { + Runnable task; + Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue; + + synchronized (tasksQueue) { + task = tasksQueue.poll(); + + if (task == null) { + sessionTasksQueue.processingCompleted = true; + break; + } + } + + eventQueueHandler.polled(PriorityThreadPoolExecutor.this, (IoEvent) task); + + runTask(task); + } + } + + private void runTask(Runnable task) { + beforeExecute(thread, task); + boolean ran = false; + try { + task.run(); + ran = true; + afterExecute(task, null); + completedTaskCount.incrementAndGet(); + } catch (RuntimeException e) { + if (!ran) { + afterExecute(task, e); + } + throw e; + } + } + } + + /** + * A class used to store the ordered list of events to be processed by the + * session, and the current task state. + */ + private class SessionQueue { + /** A queue of ordered event waiting to be processed */ + private final Queue<Runnable> tasksQueue = new ConcurrentLinkedQueue<>(); + + /** The current task state */ + private boolean processingCompleted = true; + } + + /** + * A class used to preserve first-in-first-out order of sessions that have equal + * priority. + */ + static class SessionEntry implements Comparable<SessionEntry> { + private final long seqNum; + private final IoSession session; + private final Comparator<IoSession> comparator; + + public SessionEntry(IoSession session, Comparator<IoSession> comparator) { + if (session == null) { + throw new IllegalArgumentException("session"); + } + seqNum = seq.getAndIncrement(); + this.session = session; + this.comparator = comparator; + } + + public IoSession getSession() { + return session; + } + + public int compareTo(SessionEntry other) { + if (other == this) { + return 0; + } + + if (other.session == this.session) { + return 0; + } + + // An exit signal should always be preferred. + if (this == EXIT_SIGNAL) { + return -1; + } + if (other == EXIT_SIGNAL) { + return 1; + } + + int res = 0; + + // If there's a comparator, use it to prioritise events. + if (comparator != null) { + res = comparator.compare(session, other.session); + } + + // FIFO tiebreaker. + if (res == 0) { + res = (seqNum < other.seqNum ? -1 : 1); + } + + return res; + } + } +}
