Repository: mina Updated Branches: refs/heads/2.0 b1661ec24 -> 53fdc798e
o Added a counter to avoid creating new selector again and again. If the select() returns 0, we give it 10 other chances to get a correct return. o Fixed some Sonar violations Project: http://git-wip-us.apache.org/repos/asf/mina/repo Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/53fdc798 Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/53fdc798 Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/53fdc798 Branch: refs/heads/2.0 Commit: 53fdc798eb8397fec4661f2222c6c901de8e0f11 Parents: b1661ec Author: Emmanuel Lécharny <elecha...@symas.com> Authored: Thu Aug 18 05:42:25 2016 +0200 Committer: Emmanuel Lécharny <elecha...@symas.com> Committed: Thu Aug 18 05:42:25 2016 +0200 ---------------------------------------------------------------------- .../polling/AbstractPollingIoProcessor.java | 52 ++++++++++++-------- 1 file changed, 32 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina/blob/53fdc798/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java ---------------------------------------------------------------------- diff --git a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java index c0f7ba8..6097038 100644 --- a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java +++ b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java @@ -67,7 +67,7 @@ import org.slf4j.LoggerFactory; */ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> implements IoProcessor<S> { /** A logger for this class */ - private final static Logger LOG = LoggerFactory.getLogger(IoProcessor.class); + private static final Logger LOG = LoggerFactory.getLogger(IoProcessor.class); /** * A timeout used for the select, as we need to get out to deal with idle @@ -76,7 +76,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im private static final long SELECT_TIMEOUT = 1000L; /** A map containing the last Thread ID for each class */ - private static final ConcurrentHashMap<Class<?>, AtomicInteger> threadIds = new ConcurrentHashMap<Class<?>, AtomicInteger>(); + private static final ConcurrentHashMap<Class<?>, AtomicInteger> threadIds = new ConcurrentHashMap<>(); /** This IoProcessor instance name */ private final String threadName; @@ -85,22 +85,22 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im private final Executor executor; /** A Session queue containing the newly created sessions */ - private final Queue<S> newSessions = new ConcurrentLinkedQueue<S>(); + private final Queue<S> newSessions = new ConcurrentLinkedQueue<>(); /** A queue used to store the sessions to be removed */ - private final Queue<S> removingSessions = new ConcurrentLinkedQueue<S>(); + private final Queue<S> removingSessions = new ConcurrentLinkedQueue<>(); /** A queue used to store the sessions to be flushed */ - private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<S>(); + private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<>(); /** * A queue used to store the sessions which have a trafficControl to be * updated */ - private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<S>(); + private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<>(); /** The processor thread : it handles the incoming messages */ - private final AtomicReference<Processor> processorRef = new AtomicReference<Processor>(); + private final AtomicReference<Processor> processorRef = new AtomicReference<>(); private long lastIdleCheckTime; @@ -158,6 +158,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im /** * {@inheritDoc} */ + @Override public final boolean isDisposing() { return disposing; } @@ -165,6 +166,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im /** * {@inheritDoc} */ + @Override public final boolean isDisposed() { return disposed; } @@ -172,6 +174,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im /** * {@inheritDoc} */ + @Override public final void dispose() { if (disposed || disposing) { return; @@ -252,7 +255,8 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im * @return the state of the session */ protected abstract SessionState getState(S session); - + + /** * Tells if the session ready for writing * @@ -360,6 +364,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im /** * {@inheritDoc} */ + @Override public final void add(S session) { if (disposed || disposing) { throw new IllegalStateException("Already disposed."); @@ -373,6 +378,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im /** * {@inheritDoc} */ + @Override public final void remove(S session) { scheduleRemove(session); startupProcessor(); @@ -387,6 +393,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im /** * {@inheritDoc} */ + @Override public void write(S session, WriteRequest writeRequest) { WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue(); @@ -400,6 +407,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im /** * {@inheritDoc} */ + @Override public final void flush(S session) { // add the session to the queue if it's not already // in the queue, then wake up the select() @@ -454,7 +462,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im * * @throws IOException If we got an exception */ - abstract protected void registerNewSelector() throws IOException; + protected abstract void registerNewSelector() throws IOException; /** * Check that the select() has not exited immediately just because of a @@ -464,7 +472,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im * @return <tt>true</tt> if a connection has been brutally closed. * @throws IOException If we got an exception */ - abstract protected boolean isBrokenConnection() throws IOException; + protected abstract boolean isBrokenConnection() throws IOException; /** * Loops over the new sessions blocking queue and returns the number of @@ -593,7 +601,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue(); WriteRequest req; - List<WriteRequest> failedRequests = new ArrayList<WriteRequest>(); + List<WriteRequest> failedRequests = new ArrayList<>(); if ((req = writeRequestQueue.poll(session)) != null) { Object message = req.getMessage(); @@ -652,11 +660,9 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im } // Process writes - if (isWritable(session) && !session.isWriteSuspended()) { + if (isWritable(session) && !session.isWriteSuspended() && session.setScheduledForFlush(true)) { // add the session to the queue, if it's not already there - if (session.setScheduledForFlush(true)) { - flushingSessions.add(session); - } + flushingSessions.add(session); } } @@ -707,7 +713,6 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im } if (ret < 0) { - // scheduleRemove(session); IoFilterChain filterChain = session.getFilterChain(); filterChain.fireInputClosed(); } @@ -828,7 +833,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im session.setCurrentWriteRequest(req); } - int localWrittenBytes = 0; + int localWrittenBytes; Object message = req.getMessage(); if (message instanceof IoBuffer) { @@ -1025,6 +1030,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im /** * {@inheritDoc} */ + @Override public void updateTrafficControl(S session) { // try { @@ -1054,6 +1060,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im int nSessions = 0; lastIdleCheckTime = System.currentTimeMillis(); + int nbTries = 10; for (;;) { try { @@ -1064,7 +1071,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im long t0 = System.currentTimeMillis(); int selected = select(SELECT_TIMEOUT); long t1 = System.currentTimeMillis(); - long delta = (t1 - t0); + long delta = t1 - t0; if (!wakeupCalled.getAndSet(false) && (selected == 0) && (delta < 100)) { // Last chance : the select() may have been @@ -1072,7 +1079,6 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im if (isBrokenConnection()) { LOG.warn("Broken connection"); } else { - LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0)); // Ok, we are hit by the nasty epoll // spinning. // Basically, there is a race condition @@ -1086,7 +1092,13 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im // CPU. // We have to destroy the selector, and // register all the socket on a new one. - registerNewSelector(); + if (nbTries == 0) { + LOG.warn("Create a new selector. Selected is 0, delta = " + delta); + registerNewSelector(); + nbTries = 10; + } else { + nbTries--; + } } }