Updated Branches: refs/heads/trunk c4351ac44 -> a064a8a8e
o Using a static final IS_DEBUG constant to speed up the Log o Removed some final o Modified the NioUdpSession.convertToDirectBuffer() method : we don't anymore copy the ByteBuffer into a DirectBuffer, this is a waste of time. The gain is significant : between 10* to 100% Project: http://git-wip-us.apache.org/repos/asf/mina/repo Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/b39ee919 Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/b39ee919 Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/b39ee919 Branch: refs/heads/trunk Commit: b39ee919caa082f8015b9e4430f292e43bde1851 Parents: 43734ac Author: Emmanuel Lécharny <[email protected]> Authored: Mon May 27 20:00:48 2013 +0200 Committer: Emmanuel Lécharny <[email protected]> Committed: Mon May 27 20:00:48 2013 +0200 ---------------------------------------------------------------------- .../service/idlechecker/IndexedIdleChecker.java | 87 ++++++--- .../org/apache/mina/session/AbstractIoSession.java | 153 ++++++++++----- .../mina/transport/nio/AbstractNioSession.java | 36 +++- .../apache/mina/transport/nio/NioSelectorLoop.java | 67 +++++-- .../apache/mina/transport/nio/NioUdpServer.java | 17 ++- .../apache/mina/transport/nio/NioUdpSession.java | 37 ++-- 6 files changed, 264 insertions(+), 133 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina/blob/b39ee919/core/src/main/java/org/apache/mina/service/idlechecker/IndexedIdleChecker.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/mina/service/idlechecker/IndexedIdleChecker.java b/core/src/main/java/org/apache/mina/service/idlechecker/IndexedIdleChecker.java index 51ebaee..60066a6 100644 --- a/core/src/main/java/org/apache/mina/service/idlechecker/IndexedIdleChecker.java +++ b/core/src/main/java/org/apache/mina/service/idlechecker/IndexedIdleChecker.java @@ -62,8 +62,12 @@ public class IndexedIdleChecker implements IdleChecker { /** Maximum idle time in milliseconds : default to 1 hour */ private static final long MAX_IDLE_TIME_IN_MS = MAX_IDLE_TIME_IN_SEC * 1000L; + /** A logger for this class */ private static final Logger LOG = LoggerFactory.getLogger(IndexedIdleChecker.class); + // A speedup for logs + private static final boolean IS_DEBUG = LOG.isDebugEnabled(); + private static final AttributeKey<Integer> READ_IDLE_INDEX = AttributeKey.createKey(Integer.class, "idle.read.index"); @@ -104,7 +108,7 @@ public class IndexedIdleChecker implements IdleChecker { worker.interrupt(); // wait for worker to stop worker.join(); - } catch (final InterruptedException e) { + } catch (InterruptedException e) { // interrupted, we don't care much } } @@ -113,30 +117,46 @@ public class IndexedIdleChecker implements IdleChecker { * {@inheritDoc} */ @Override - public void sessionRead(final AbstractIoSession session, final long timeInMs) { - LOG.debug("session read event, compute idle index of session {}", session); + public void sessionRead(AbstractIoSession session, long timeInMs) { + if (IS_DEBUG) { + LOG.debug("session read event, compute idle index of session {}", session); + } // remove from the old index position - final Integer oldIndex = session.getAttribute(READ_IDLE_INDEX); + Integer oldIndex = session.getAttribute(READ_IDLE_INDEX); + if (oldIndex != null && readIdleSessionIndex[oldIndex] != null) { - LOG.debug("remove for old index {}", oldIndex); + if (IS_DEBUG) { + LOG.debug("remove for old index {}", oldIndex); + } + readIdleSessionIndex[oldIndex].remove(session); } - final long idleTimeInMs = session.getConfig().getIdleTimeInMillis(IdleStatus.READ_IDLE); + long idleTimeInMs = session.getConfig().getIdleTimeInMillis(IdleStatus.READ_IDLE); + // is idle enabled ? if (idleTimeInMs <= 0L) { - LOG.debug("no read idle configuration"); + if (IS_DEBUG) { + LOG.debug("no read idle configuration"); + } } else { - final int nextIdleTimeInSeconds = (int) ((timeInMs + idleTimeInMs) / 1000L); - final int index = nextIdleTimeInSeconds % MAX_IDLE_TIME_IN_SEC; - LOG.debug("computed index : {}", index); + int nextIdleTimeInSeconds = (int) ((timeInMs + idleTimeInMs) / 1000L); + int index = nextIdleTimeInSeconds % MAX_IDLE_TIME_IN_SEC; + + if (IS_DEBUG) { + LOG.debug("computed index : {}", index); + } + if (readIdleSessionIndex[index] == null) { readIdleSessionIndex[index] = Collections .newSetFromMap(new ConcurrentHashMap<AbstractIoSession, Boolean>()); } - LOG.debug("marking session {} idle for index {}", session, index); + if (IS_DEBUG) { + LOG.debug("marking session {} idle for index {}", session, index); + } + readIdleSessionIndex[index].add(session); session.setAttribute(READ_IDLE_INDEX, index); } @@ -146,23 +166,33 @@ public class IndexedIdleChecker implements IdleChecker { * {@inheritDoc} */ @Override - public void sessionWritten(final AbstractIoSession session, final long timeInMs) { - LOG.debug("session write event, compute idle index of session {}", session); + public void sessionWritten(AbstractIoSession session, long timeInMs) { + if (IS_DEBUG) { + LOG.debug("session write event, compute idle index of session {}", session); + } // remove from the old index position - final Integer oldIndex = session.getAttribute(WRITE_IDLE_INDEX); + Integer oldIndex = session.getAttribute(WRITE_IDLE_INDEX); + if (oldIndex != null && writeIdleSessionIndex[oldIndex] != null) { - LOG.debug("remove for old index {}", oldIndex); + if (IS_DEBUG) { + LOG.debug("remove for old index {}", oldIndex); + } + writeIdleSessionIndex[oldIndex].remove(session); } - final long idleTimeInMs = session.getConfig().getIdleTimeInMillis(IdleStatus.WRITE_IDLE); + long idleTimeInMs = session.getConfig().getIdleTimeInMillis(IdleStatus.WRITE_IDLE); + // is idle enabled ? if (idleTimeInMs <= 0L) { - LOG.debug("no write idle configuration"); + if (IS_DEBUG) { + LOG.debug("no write idle configuration"); + } } else { - final int nextIdleTimeInSeconds = (int) ((timeInMs + idleTimeInMs) / 1000L); - final int index = nextIdleTimeInSeconds % MAX_IDLE_TIME_IN_SEC; + int nextIdleTimeInSeconds = (int) ((timeInMs + idleTimeInMs) / 1000L); + int index = nextIdleTimeInSeconds % MAX_IDLE_TIME_IN_SEC; + if (writeIdleSessionIndex[index] == null) { writeIdleSessionIndex[index] = Collections .newSetFromMap(new ConcurrentHashMap<AbstractIoSession, Boolean>()); @@ -177,13 +207,13 @@ public class IndexedIdleChecker implements IdleChecker { * {@inheritDoc} */ @Override - public int processIdleSession(final long timeMs) { + public int processIdleSession(long timeMs) { int counter = 0; - final long delta = timeMs - lastCheckTimeMs; + long delta = timeMs - lastCheckTimeMs; if (LOG.isDebugEnabled()) { LOG.debug("checking idle time, last = {}, now = {}, delta = {}", new Object[] { lastCheckTimeMs, timeMs, - delta }); + delta }); } if (delta < 1000) { @@ -195,9 +225,9 @@ public class IndexedIdleChecker implements IdleChecker { // LOG.debug("first check, we start now"); // lastCheckTimeMs = System.currentTimeMillis() - 1000; // } - final int startIdx = ((int) (Math.max(lastCheckTimeMs, timeMs - MAX_IDLE_TIME_IN_MS + 1) / 1000L)) + int startIdx = ((int) (Math.max(lastCheckTimeMs, timeMs - MAX_IDLE_TIME_IN_MS + 1) / 1000L)) % MAX_IDLE_TIME_IN_SEC; - final int endIdx = ((int) (timeMs / 1000L)) % MAX_IDLE_TIME_IN_SEC; + int endIdx = ((int) (timeMs / 1000L)) % MAX_IDLE_TIME_IN_SEC; LOG.debug("scaning from index {} to index {}", startIdx, endIdx); @@ -218,15 +248,16 @@ public class IndexedIdleChecker implements IdleChecker { return counter; } - private int processIndex(final Set<AbstractIoSession>[] indexByTime, final int position, final IdleStatus status) { - final Set<AbstractIoSession> sessions = indexByTime[position]; + private int processIndex(Set<AbstractIoSession>[] indexByTime, int position, IdleStatus status) { + Set<AbstractIoSession> sessions = indexByTime[position]; + if (sessions == null) { return 0; } int counter = 0; - for (final AbstractIoSession idleSession : sessions) { + for (AbstractIoSession idleSession : sessions) { idleSession.setAttribute(status == IdleStatus.READ_IDLE ? READ_IDLE_INDEX : WRITE_IDLE_INDEX, null); // check if idle detection wasn't disabled since the index update if (idleSession.getConfig().getIdleTimeInMillis(status) > 0) { @@ -255,7 +286,7 @@ public class IndexedIdleChecker implements IdleChecker { try { sleep(GRANULARITY_IN_MS); processIdleSession(System.currentTimeMillis()); - } catch (final InterruptedException e) { + } catch (InterruptedException e) { break; } } http://git-wip-us.apache.org/repos/asf/mina/blob/b39ee919/core/src/main/java/org/apache/mina/session/AbstractIoSession.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/mina/session/AbstractIoSession.java b/core/src/main/java/org/apache/mina/session/AbstractIoSession.java index af32853..ae28ef3 100644 --- a/core/src/main/java/org/apache/mina/session/AbstractIoSession.java +++ b/core/src/main/java/org/apache/mina/session/AbstractIoSession.java @@ -61,6 +61,9 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon /** The logger for this class */ private static final Logger LOG = LoggerFactory.getLogger(AbstractIoSession.class); + // A speedup for logs + private static final boolean IS_DEBUG = LOG.isDebugEnabled(); + /** unique identifier generator */ private static final AtomicInteger NEXT_ID = new AtomicInteger(0); @@ -137,7 +140,7 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon * @param service the service this session is associated with * @param idleChecker the checker for idle session */ - public AbstractIoSession(final IoService service, final IdleChecker idleChecker) { + public AbstractIoSession(IoService service, IdleChecker idleChecker) { // generated a unique id id = NEXT_ID.getAndIncrement(); creationTime = System.currentTimeMillis(); @@ -146,7 +149,9 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon this.idleChecker = idleChecker; this.config = service.getSessionConfig(); - LOG.debug("Created new session with id : {}", id); + if (IS_DEBUG) { + LOG.debug("Created new session with id : {}", id); + } this.state = SessionState.CREATED; service.getManagedSessions().put(id, this); @@ -243,7 +248,7 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon * {@inheritDoc} */ @Override - public void changeState(final SessionState to) { + public void changeState(SessionState to) { try { stateWriteLock.lock(); @@ -332,7 +337,7 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon /** * {@inheritDoc} */ - public void setSecured(final boolean secured) { + public void setSecured(boolean secured) { this.secured = secured; } @@ -340,8 +345,8 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon * {@inheritDoc} */ @Override - public void initSecure(final SSLContext sslContext) throws SSLException { - final SslHelper sslHelper = new SslHelper(this, sslContext); + public void initSecure(SSLContext sslContext) throws SSLException { + SslHelper sslHelper = new SslHelper(this, sslContext); sslHelper.init(); attributes.setAttribute(SSL_HELPER, sslHelper); @@ -377,7 +382,7 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon * * @param bytesCount number of extra bytes written */ - public void incrementWrittenBytes(final int bytesCount) { + public void incrementWrittenBytes(int bytesCount) { writtenBytes += bytesCount; } @@ -428,7 +433,7 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon * @see #setAttribute(AttributeKey, Object) */ @Override - public final <T> T getAttribute(final AttributeKey<T> key, final T defaultValue) { + public final <T> T getAttribute(AttributeKey<T> key, T defaultValue) { return attributes.getAttribute(key, defaultValue); } @@ -439,7 +444,7 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon * @see #setAttribute(AttributeKey, Object) */ @Override - public final <T> T getAttribute(final AttributeKey<T> key) { + public final <T> T getAttribute(AttributeKey<T> key) { return attributes.getAttribute(key); } @@ -457,7 +462,7 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon * @see #getAttribute(AttributeKey) */ @Override - public final <T> T setAttribute(final AttributeKey<? extends T> key, final T value) { + public final <T> T setAttribute(AttributeKey<? extends T> key, T value) { return attributes.setAttribute(key, value); }; @@ -477,7 +482,7 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon * @exception IllegalArgumentException if <code>key==null</code> */ @Override - public <T> T removeAttribute(final AttributeKey<T> key) { + public <T> T removeAttribute(AttributeKey<T> key) { return attributes.removeAttribute(key); } @@ -489,7 +494,7 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon * {@inheritDoc} */ @Override - public void write(final Object message) { + public void write(Object message) { doWriteWithFuture(message, null); } @@ -497,14 +502,17 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon * {@inheritDoc} */ @Override - public IoFuture<Void> writeWithFuture(final Object message) { - final IoFuture<Void> future = new DefaultWriteFuture(); + public IoFuture<Void> writeWithFuture(Object message) { + IoFuture<Void> future = new DefaultWriteFuture(); doWriteWithFuture(message, future); + return future; } - private void doWriteWithFuture(final Object message, final IoFuture<Void> future) { - LOG.debug("writing message {} to session {}", message, this); + private void doWriteWithFuture(Object message, IoFuture<Void> future) { + if (IS_DEBUG) { + LOG.debug("writing message {} to session {}", message, this); + } if ((state == SessionState.CLOSED) || (state == SessionState.CLOSING)) { LOG.error("writing to closed or closing session, the message is discarded"); @@ -522,9 +530,13 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon // ------------------------------------------------------------------------ /** send a caught exception to the {@link IoHandler} (if any) */ - protected void processException(final Exception t) { - LOG.debug("caught session exception ", t); - final IoHandler handler = getService().getIoHandler(); + protected void processException(Exception t) { + if (IS_DEBUG) { + LOG.debug("caught session exception ", t); + } + + IoHandler handler = getService().getIoHandler(); + if (handler != null) { handler.exceptionCaught(this, t); } @@ -534,18 +546,21 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon * process session open event using the filter chain. To be called by the session {@link SelectorLoop} . */ public void processSessionOpen() { - LOG.debug("processing session open event"); + if (IS_DEBUG) { + LOG.debug("processing session open event"); + } try { - for (final IoFilter filter : chain) { + for (IoFilter filter : chain) { filter.sessionOpened(this); } - final IoHandler handler = getService().getIoHandler(); + IoHandler handler = getService().getIoHandler(); if (handler != null) { IoHandlerExecutor executor = getService().getIoHandlerExecutor(); + if (executor != null) { // asynchronous event executor.execute(new OpenEvent(this)); @@ -554,7 +569,7 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon handler.sessionOpened(this); } } - } catch (final RuntimeException e) { + } catch (RuntimeException e) { processException(e); } } @@ -563,13 +578,17 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon * process session closed event using the filter chain. To be called by the session {@link SelectorLoop} . */ public void processSessionClosed() { - LOG.debug("processing session closed event"); + if (IS_DEBUG) { + LOG.debug("processing session closed event"); + } + try { - for (final IoFilter filter : chain) { + for (IoFilter filter : chain) { filter.sessionClosed(this); } - final IoHandler handler = getService().getIoHandler(); + IoHandler handler = getService().getIoHandler(); + if (handler != null) { IoHandlerExecutor executor = getService().getIoHandlerExecutor(); if (executor != null) { @@ -580,7 +599,7 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon handler.sessionClosed(this); } } - } catch (final RuntimeException e) { + } catch (RuntimeException e) { processException(e); } service.getManagedSessions().remove(id); @@ -589,16 +608,21 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon /** * process session idle event using the filter chain. To be called by the session {@link SelectorLoop} . */ - public void processSessionIdle(final IdleStatus status) { - LOG.debug("processing session idle {} event for session {}", status, this); + public void processSessionIdle(IdleStatus status) { + if (IS_DEBUG) { + LOG.debug("processing session idle {} event for session {}", status, this); + } try { - for (final IoFilter filter : chain) { + for (IoFilter filter : chain) { filter.sessionIdle(this, status); } - final IoHandler handler = getService().getIoHandler(); + + IoHandler handler = getService().getIoHandler(); + if (handler != null) { IoHandlerExecutor executor = getService().getIoHandlerExecutor(); + if (executor != null) { // asynchronous event executor.execute(new IdleEvent(this, status)); @@ -607,7 +631,7 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon handler.sessionIdle(this, status); } } - } catch (final RuntimeException e) { + } catch (RuntimeException e) { processException(e); } } @@ -625,8 +649,10 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon * * @param message the received message */ - public void processMessageReceived(final ByteBuffer message) { - LOG.debug("processing message '{}' received event for session {}", message, this); + public void processMessageReceived(ByteBuffer message) { + if (IS_DEBUG) { + LOG.debug("processing message '{}' received event for session {}", message, this); + } tl.set(message); try { @@ -635,14 +661,22 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon lastReadTime = System.currentTimeMillis(); if (chain.length < 1) { - LOG.debug("Nothing to do, the chain is empty"); - final IoHandler handler = getService().getIoHandler(); + if (IS_DEBUG) { + LOG.debug("Nothing to do, the chain is empty"); + } + + IoHandler handler = getService().getIoHandler(); + if (handler != null) { IoHandlerExecutor executor = getService().getIoHandlerExecutor(); + if (executor != null) { // asynchronous event // copy the bytebuffer - LOG.debug("copying bytebuffer before pushing to the executor"); + if (IS_DEBUG) { + LOG.debug("copying bytebuffer before pushing to the executor"); + } + ByteBuffer original = message; ByteBuffer clone = ByteBuffer.allocate(original.capacity()); // copy from the beginning @@ -662,7 +696,7 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon // we call the first filter, it's supposed to call the next ones using the filter chain controller chain[readChainPosition].messageReceived(this, message, this); } - } catch (final RuntimeException e) { + } catch (RuntimeException e) { processException(e); } @@ -673,8 +707,10 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon * * @param message the wrote message, should be transformed into ByteBuffer at the end of the filter chain */ - public void processMessageWriting(WriteRequest writeRequest, final IoFuture<Void> future) { - LOG.debug("processing message '{}' writing event for session {}", writeRequest, this); + public void processMessageWriting(WriteRequest writeRequest, IoFuture<Void> future) { + if (IS_DEBUG) { + LOG.debug("processing message '{}' writing event for session {}", writeRequest, this); + } try { // lastWriteRequest = null; @@ -684,8 +720,8 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon } else { writeChainPosition = chain.length - 1; // we call the first filter, it's supposed to call the next ones using the filter chain controller - final int position = writeChainPosition; - final IoFilter nextFilter = chain[position]; + int position = writeChainPosition; + IoFilter nextFilter = chain[position]; nextFilter.messageWriting(this, writeRequest, this); } @@ -693,23 +729,29 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon if (future != null) { writeRequest.setFuture(future); } - } catch (final RuntimeException e) { + } catch (RuntimeException e) { processException(e); } } - public void processMessageSent(final Object highLevelMessage) { - LOG.debug("processing message '{}' sent event for session {}", highLevelMessage, this); + public void processMessageSent(Object highLevelMessage) { + if (IS_DEBUG) { + LOG.debug("processing message '{}' sent event for session {}", highLevelMessage, this); + } try { - final int size = chain.length; + int size = chain.length; + for (int i = size - 1; i >= 0; i--) { chain[i].messageSent(this, highLevelMessage); } - final IoHandler handler = getService().getIoHandler(); + + IoHandler handler = getService().getIoHandler(); + if (handler != null) { IoHandlerExecutor executor = getService().getIoHandlerExecutor(); + if (executor != null) { // asynchronous event executor.execute(new SentEvent(this, highLevelMessage)); @@ -718,7 +760,7 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon handler.messageSent(this, highLevelMessage); } } - } catch (final RuntimeException e) { + } catch (RuntimeException e) { processException(e); } @@ -731,7 +773,9 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon */ @Override public void callWriteNextFilter(WriteRequest message) { - LOG.debug("calling next filter for writing for message '{}' position : {}", message, writeChainPosition); + if (IS_DEBUG) { + LOG.debug("calling next filter for writing for message '{}' position : {}", message, writeChainPosition); + } writeChainPosition--; @@ -749,19 +793,24 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon * {@inheritDoc} */ @Override - public void callReadNextFilter(final Object message) { + public void callReadNextFilter(Object message) { readChainPosition++; if (readChainPosition >= chain.length) { // end of chain processing - final IoHandler handler = getService().getIoHandler(); + IoHandler handler = getService().getIoHandler(); + if (handler != null) { IoHandlerExecutor executor = getService().getIoHandlerExecutor(); + if (executor != null) { // asynchronous event if (message == tl.get()) { // copy the bytebuffer - LOG.debug("copying bytebuffer before pushing to the executor"); + if (IS_DEBUG) { + LOG.debug("copying bytebuffer before pushing to the executor"); + } + ByteBuffer original = (ByteBuffer) message; ByteBuffer clone = ByteBuffer.allocate(original.capacity()); // copy from the beginning http://git-wip-us.apache.org/repos/asf/mina/blob/b39ee919/core/src/main/java/org/apache/mina/transport/nio/AbstractNioSession.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/mina/transport/nio/AbstractNioSession.java b/core/src/main/java/org/apache/mina/transport/nio/AbstractNioSession.java index 85cf170..6c2aa05 100644 --- a/core/src/main/java/org/apache/mina/transport/nio/AbstractNioSession.java +++ b/core/src/main/java/org/apache/mina/transport/nio/AbstractNioSession.java @@ -45,9 +45,12 @@ import org.slf4j.LoggerFactory; * @author <a href="http://mina.apache.org">Apache MINA Project</a> */ public abstract class AbstractNioSession extends AbstractIoSession { - + /** The logger for this class */ private static final Logger LOG = LoggerFactory.getLogger(AbstractNioSession.class); + // A speedup for logs + private static final boolean IS_DEBUG = LOG.isDebugEnabled(); + /** the NIO channel for this session */ protected final SelectableChannel channel; @@ -139,10 +142,13 @@ public abstract class AbstractNioSession extends AbstractIoSession { */ @Override public WriteRequest enqueueWriteRequest(WriteRequest writeRequest) { - LOG.debug("enqueueWriteRequest {}", writeRequest); + if (IS_DEBUG) { + LOG.debug("enqueueWriteRequest {}", writeRequest); + } + if (isConnectedSecured()) { // SSL/TLS : we have to encrypt the message - final SslHelper sslHelper = getAttribute(SSL_HELPER, null); + SslHelper sslHelper = getAttribute(SSL_HELPER, null); if (sslHelper == null) { throw new IllegalStateException(); @@ -162,7 +168,9 @@ public abstract class AbstractNioSession extends AbstractIoSession { // data in the channel immediately if we can int written = writeDirect(writeRequest.getMessage()); - LOG.debug("wrote {} bytes to {}", written, this); + if (IS_DEBUG) { + LOG.debug("wrote {} bytes to {}", written, this); + } if (written > 0) { incrementWrittenBytes(written); @@ -237,8 +245,10 @@ public abstract class AbstractNioSession extends AbstractIoSession { */ public void processWrite(SelectorLoop selectorLoop) { try { - LOG.debug("ready for write"); - LOG.debug("writable session : {}", this); + if (IS_DEBUG) { + LOG.debug("ready for write"); + LOG.debug("writable session : {}", this); + } do { // get a write request from the queue. We left it in the queue, @@ -252,15 +262,18 @@ public abstract class AbstractNioSession extends AbstractIoSession { } // The message is necessarily a ByteBuffer at this point - final ByteBuffer buf = (ByteBuffer) writeRequest.getMessage(); + ByteBuffer buf = (ByteBuffer) writeRequest.getMessage(); // Note that if the connection is secured, the buffer // already contains encrypted data. // Try to write the data, and get back the number of bytes // actually written - final int written = ((SocketChannel) channel).write(buf); - LOG.debug("wrote {} bytes to {}", written, this); + int written = ((SocketChannel) channel).write(buf); + + if (IS_DEBUG) { + LOG.debug("wrote {} bytes to {}", written, this); + } if (written > 0) { incrementWrittenBytes(written); @@ -307,7 +320,10 @@ public abstract class AbstractNioSession extends AbstractIoSession { synchronized (writeQueue) { if (writeQueue.isEmpty()) { if (isClosing()) { - LOG.debug("closing session {} have empty write queue, so we close it", this); + if (IS_DEBUG) { + LOG.debug("closing session {} have empty write queue, so we close it", this); + } + // we was flushing writes, now we to the close channelClose(); } else { http://git-wip-us.apache.org/repos/asf/mina/blob/b39ee919/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java b/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java index d3b7ab1..7dd2503 100644 --- a/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java +++ b/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java @@ -41,7 +41,9 @@ import org.slf4j.LoggerFactory; */ public class NioSelectorLoop implements SelectorLoop { /** The logger for this class */ - private final Logger logger; + private static final Logger LOG = LoggerFactory.getLogger(NioSelectorLoop.class); + + private static final boolean IS_DEBUG = LOG.isDebugEnabled(); /** the selector managed by this class */ private Selector selector; @@ -77,19 +79,24 @@ public class NioSelectorLoop implements SelectorLoop { workerName += "-" + index; } - logger = LoggerFactory.getLogger(name); SelectorWorker worker = new SelectorWorker(workerName); try { - logger.debug("open a selector"); + if (IS_DEBUG) { + LOG.debug("open a selector"); + } + selector = Selector.open(); } catch (final IOException ioe) { - logger.error("Impossible to open a new NIO selector, O/S is out of file descriptor ?"); + LOG.error("Impossible to open a new NIO selector, O/S is out of file descriptor ?"); throw new IllegalStateException("Impossible to open a new NIO selector, O/S is out of file descriptor ?", ioe); } - logger.debug("starting worker thread"); + if (IS_DEBUG) { + LOG.debug("starting worker thread"); + } + worker.start(); } @@ -100,8 +107,11 @@ public class NioSelectorLoop implements SelectorLoop { @Override public void register(boolean accept, boolean connect, boolean read, boolean write, SelectorListener listener, SelectableChannel channel, RegistrationCallback callback) { - logger.debug("registering : {} for accept : {}, connect: {}, read : {}, write : {}, channel : {}", - new Object[] { listener, accept, connect, read, write, channel }); + if (IS_DEBUG) { + LOG.debug("registering : {} for accept : {}, connect: {}, read : {}, write : {}, channel : {}", + new Object[] { listener, accept, connect, read, write, channel }); + } + int ops = 0; if (accept) { @@ -133,12 +143,15 @@ public class NioSelectorLoop implements SelectorLoop { @Override public void modifyRegistration(boolean accept, boolean read, boolean write, final SelectorListener listener, SelectableChannel channel, boolean wakeup) { - logger.debug("modifying registration : {} for accept : {}, read : {}, write : {}, channel : {}", new Object[] { - listener, accept, read, write, channel }); + if (IS_DEBUG) { + LOG.debug("modifying registration : {} for accept : {}, read : {}, write : {}, channel : {}", new Object[] { + listener, accept, read, write, channel }); + } final SelectionKey key = channel.keyFor(selector); + if (key == null) { - logger.error("Trying to modify the registration of a not registered channel"); + LOG.error("Trying to modify the registration of a not registered channel"); return; } @@ -169,16 +182,22 @@ public class NioSelectorLoop implements SelectorLoop { */ @Override public void unregister(final SelectorListener listener, final SelectableChannel channel) { - logger.debug("unregistering : {}", listener); + if (IS_DEBUG) { + LOG.debug("unregistering : {}", listener); + } + final SelectionKey key = channel.keyFor(selector); + if (key == null) { - logger.error("Trying to modify the registration of a not registered channel"); + LOG.error("Trying to modify the registration of a not registered channel"); return; } key.cancel(); key.attach(null); - logger.debug("unregistering : {} done !", listener); + if (IS_DEBUG) { + LOG.debug("unregistering : {} done !", listener); + } } /** @@ -197,9 +216,15 @@ public class NioSelectorLoop implements SelectorLoop { for (;;) { try { - logger.debug("selecting..."); + if (IS_DEBUG) { + LOG.debug("selecting..."); + } + final int readyCount = selector.select(); - logger.debug("... done selecting : {} events", readyCount); + + if (IS_DEBUG) { + LOG.debug("... done selecting : {} events", readyCount); + } if (readyCount > 0) { final Iterator<SelectionKey> it = selector.selectedKeys().iterator(); @@ -215,9 +240,11 @@ public class NioSelectorLoop implements SelectorLoop { listener.ready(isAcceptable, isConnectable, isReadable, isReadable ? readBuffer : null, isWritable); // if you don't remove the event of the set, the selector will present you this event again - // and - // again - logger.debug("remove"); + // and again + if (IS_DEBUG) { + LOG.debug("remove"); + } + it.remove(); } } @@ -234,11 +261,11 @@ public class NioSelectorLoop implements SelectorLoop { } } catch (final ClosedChannelException ex) { // dead session.. - logger.error("socket is already dead", ex); + LOG.error("socket is already dead", ex); } } } catch (final Exception e) { - logger.error("Unexpected exception : ", e); + LOG.error("Unexpected exception : ", e); } } } http://git-wip-us.apache.org/repos/asf/mina/blob/b39ee919/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java b/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java index f7f1d40..1a83f12 100644 --- a/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java +++ b/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java @@ -47,6 +47,8 @@ public class NioUdpServer extends AbstractUdpServer implements SelectorListener static final Logger LOG = LoggerFactory.getLogger(NioUdpServer.class); + private static final boolean IS_DEBUG = LOG.isDebugEnabled(); + // the bound local address private SocketAddress address = null; @@ -206,7 +208,7 @@ public class NioUdpServer extends AbstractUdpServer implements SelectorListener final boolean write) { // Process the reads first try { - System.err.println("remaining : " + readBuffer.remaining()); + //System.err.println("remaining : " + readBuffer.remaining()); final SocketAddress source = datagramChannel.receive(readBuffer); NioUdpSession session = null; @@ -218,10 +220,15 @@ public class NioUdpServer extends AbstractUdpServer implements SelectorListener session = createSession(source, datagramChannel); } if (read) { - LOG.debug("readable datagram for UDP service : {}", this); + if (IS_DEBUG) { + LOG.debug("readable datagram for UDP service : {}", this); + } + readBuffer.flip(); - LOG.debug("read {} bytes form {}", readBuffer.remaining(), source); + if (IS_DEBUG) { + LOG.debug("read {} bytes form {}", readBuffer.remaining(), source); + } session.receivedDatagram(readBuffer); @@ -232,7 +239,9 @@ public class NioUdpServer extends AbstractUdpServer implements SelectorListener session.processWrite(readSelectorLoop); } } else { - LOG.debug("Do data to read"); + if (IS_DEBUG) { + LOG.debug("Do data to read"); + } } } catch (final IOException ex) { http://git-wip-us.apache.org/repos/asf/mina/blob/b39ee919/core/src/main/java/org/apache/mina/transport/nio/NioUdpSession.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/mina/transport/nio/NioUdpSession.java b/core/src/main/java/org/apache/mina/transport/nio/NioUdpSession.java index 22ac9e7..42a0285 100644 --- a/core/src/main/java/org/apache/mina/transport/nio/NioUdpSession.java +++ b/core/src/main/java/org/apache/mina/transport/nio/NioUdpSession.java @@ -40,6 +40,8 @@ public class NioUdpSession extends AbstractNioSession implements SelectorListene private static final Logger LOG = LoggerFactory.getLogger(NioUdpSession.class); + private static final boolean IS_DEBUG = LOG.isDebugEnabled(); + private final SocketAddress localAddress; private final SocketAddress remoteAddress; @@ -241,20 +243,8 @@ public class NioUdpSession extends AbstractNioSession implements SelectorListene */ @Override protected ByteBuffer convertToDirectBuffer(WriteRequest writeRequest, boolean createNew) { - ByteBuffer message = (ByteBuffer) writeRequest.getMessage(); - - if (!message.isDirect()) { - int remaining = message.remaining(); - - ByteBuffer directBuffer = ByteBuffer.allocateDirect(remaining); - directBuffer.put(message); - directBuffer.flip(); - writeRequest.setMessage(directBuffer); - - return directBuffer; - } - - return message; + // Here, we don't create a new DirectBuffer. We let the underlying layer do the job for us + return (ByteBuffer) writeRequest.getMessage(); } /** @@ -271,13 +261,15 @@ public class NioUdpSession extends AbstractNioSession implements SelectorListene @Override public void ready(boolean accept, boolean connect, boolean read, ByteBuffer readBuffer, boolean write) { - if (LOG.isDebugEnabled()) { + if (IS_DEBUG) { LOG.debug("session {} ready for accept={}, connect={}, read={}, write={}", new Object[] { this, accept, - connect, read, write }); + connect, read, write }); } if (read) { - LOG.debug("readable datagram for UDP service : {}", this); + if (IS_DEBUG) { + LOG.debug("readable datagram for UDP service : {}", this); + } // Read everything we can up to the buffer size try { @@ -285,10 +277,17 @@ public class NioUdpSession extends AbstractNioSession implements SelectorListene readBuffer.flip(); int readbytes = readBuffer.remaining(); - LOG.debug("read {} bytes", readbytes); + + if (IS_DEBUG) { + LOG.debug("read {} bytes", readbytes); + } + if (readbytes <= 0) { // session closed by the remote peer - LOG.debug("session closed by the remote peer"); + if (IS_DEBUG) { + LOG.debug("session closed by the remote peer"); + } + close(true); } else { receivedDatagram(readBuffer);
