Author: elecharny
Date: Sat Jan 16 15:27:24 2010
New Revision: 899977
URL: http://svn.apache.org/viewvc?rev=899977&view=rev
Log:
o Javadoc addition
o Removed some useless method (scheduleTrafficControl)
o Get rid of most of the for(;;) construction, when not necessary
o Added some comment
o Refactoring to make the code more candy to the eye
Modified:
mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
Modified:
mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java?rev=899977&r1=899976&r2=899977&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
Sat Jan 16 15:27:24 2010
@@ -35,9 +35,11 @@
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.file.FileRegion;
import org.apache.mina.core.filterchain.IoFilterChain;
+import org.apache.mina.core.filterchain.IoFilterChainBuilder;
import org.apache.mina.core.future.DefaultIoFuture;
import org.apache.mina.core.service.AbstractIoService;
import org.apache.mina.core.service.IoProcessor;
+import org.apache.mina.core.service.IoServiceListenerSupport;
import org.apache.mina.core.session.AbstractIoSession;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.core.session.IoSessionConfig;
@@ -62,8 +64,7 @@
public abstract class AbstractPollingIoProcessor<T extends AbstractIoSession>
implements IoProcessor<T> {
/** A logger for this class */
- private final static Logger LOG = LoggerFactory
- .getLogger(IoProcessor.class);
+ private final static Logger LOG =
LoggerFactory.getLogger(IoProcessor.class);
/**
* The maximum loop count for a write operation until
@@ -82,10 +83,13 @@
/** A map containing the last Thread ID for each class */
private static final Map<Class<?>, AtomicInteger> threadIds = new
HashMap<Class<?>, AtomicInteger>();
+ /** A lock used to protect the processor creation */
private final Object lock = new Object();
+ /** This IoProcessor instance name */
private final String threadName;
+ /** The executor to use when we need to start the inner Processor */
private final Executor executor;
/** A Session queue containing the newly created sessions */
@@ -329,10 +333,8 @@
/**
* Initialize the polling of a session. Add it to the polling process.
*
- * @param session
- * the {...@link IoSession} to add to the polling
- * @throws Exception
- * any exception thrown by the underlying system calls
+ * @param session the {...@link IoSession} to add to the polling
+ * @throws Exception any exception thrown by the underlying system calls
*/
protected abstract void init(T session) throws Exception;
@@ -426,30 +428,25 @@
* {...@inheritdoc}
*/
public final void flush(T session) {
- if (scheduleFlush(session)) {
+ if (session.setScheduledForFlush(true)) {
+ flushingSessions.add(session);
wakeup();
}
}
- private boolean scheduleFlush(T session) {
+ private void scheduleFlush(T session) {
if (session.setScheduledForFlush(true)) {
// add the session to the queue
flushingSessions.add(session);
- return true;
}
- return false;
}
/**
* {...@inheritdoc}
*/
public final void updateTrafficMask(T session) {
- scheduleTrafficControl(session);
- wakeup();
- }
-
- private void scheduleTrafficControl(T session) {
trafficControllingSessions.add(session);
+ wakeup();
}
/**
@@ -460,8 +457,7 @@
synchronized (lock) {
if (processor == null) {
processor = new Processor();
- executor.execute(new NamePreservingRunnable(processor,
- threadName));
+ executor.execute(new NamePreservingRunnable(processor,
threadName));
}
}
@@ -500,14 +496,7 @@
private int handleNewSessions() {
int addedSessions = 0;
- for (;;) {
- T session = newSessions.poll();
-
- if (session == null) {
- // All new sessions have been handled
- break;
- }
-
+ for (T session = newSessions.poll(); session != null; session =
newSessions.poll()) {
if (addNow(session)) {
// A new session has been created
addedSessions++;
@@ -517,6 +506,15 @@
return addedSessions;
}
+ /**
+ * Process a new session :
+ * - initialize it
+ * - create its chain
+ * - fire the CREATED listeners if any
+ *
+ * @param session The session to create
+ * @return true if the session has been registered
+ */
private boolean addNow(T session) {
boolean registered = false;
boolean notified = false;
@@ -526,13 +524,15 @@
registered = true;
// Build the filter chain of this session.
- session.getService().getFilterChainBuilder().buildFilterChain(
- session.getFilterChain());
+ IoFilterChainBuilder chainBuilder =
session.getService().getFilterChainBuilder();
+ chainBuilder.buildFilterChain(session.getFilterChain());
// DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
// in AbstractIoFilterChain.fireSessionOpened().
- ((AbstractIoService) session.getService()).getListeners()
- .fireSessionCreated(session);
+ // Propagate the SESSION_CREATED event up to the chain
+ IoServiceListenerSupport listeners = ((AbstractIoService)
session.getService()).getListeners();
+ listeners.fireSessionCreated(session);
+
notified = true;
} catch (Throwable e) {
if (notified) {
@@ -544,6 +544,7 @@
wakeup();
} else {
ExceptionMonitor.getInstance().exceptionCaught(e);
+
try {
destroy(session);
} catch (Exception e1) {
@@ -553,50 +554,47 @@
}
}
}
+
return registered;
}
private int removeSessions() {
int removedSessions = 0;
- for (;;) {
- T session = removingSessions.poll();
-
- if (session == null) {
- // No session to remove. Get out.
- return removedSessions;
- }
-
+ for (T session = removingSessions.poll();session != null;session =
removingSessions.poll()) {
SessionState state = getState(session);
// Now deal with the removal accordingly to the session's state
switch (state) {
- case OPENED:
- // Try to remove this session
- if (removeNow(session)) {
- removedSessions++;
- }
-
- break;
-
- case CLOSING:
- // Skip if channel is already closed
- break;
-
- case OPENING:
- // Remove session from the newSessions queue and
- // remove it
- newSessions.remove(session);
-
- if (removeNow(session)) {
- removedSessions++;
- }
- break;
-
- default:
- throw new IllegalStateException(String.valueOf(state));
+ case OPENED:
+ // Try to remove this session
+ if (removeNow(session)) {
+ removedSessions++;
+ }
+
+ break;
+
+ case CLOSING:
+ // Skip if channel is already closed
+ break;
+
+ case OPENING:
+ // Remove session from the newSessions queue and
+ // remove it
+ newSessions.remove(session);
+
+ if (removeNow(session)) {
+ removedSessions++;
+ }
+
+ break;
+
+ default:
+ throw new IllegalStateException(String.valueOf(state));
}
}
+
+ return removedSessions;
}
private boolean removeNow(T session) {
@@ -623,9 +621,10 @@
List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
if ((req = writeRequestQueue.poll(session)) != null) {
- Object m = req.getMessage();
- if (m instanceof IoBuffer) {
- IoBuffer buf = (IoBuffer) req.getMessage();
+ Object message = req.getMessage();
+
+ if (message instanceof IoBuffer) {
+ IoBuffer buf = (IoBuffer)message;
// The first unwritten empty buffer must be
// forwarded to the filter chain.
@@ -650,10 +649,12 @@
if (!failedRequests.isEmpty()) {
WriteToClosedSessionException cause = new
WriteToClosedSessionException(
failedRequests);
+
for (WriteRequest r : failedRequests) {
session.decreaseScheduledBytesAndMessages(r);
r.getFuture().setException(cause);
}
+
IoFilterChain filterChain = session.getFilterChain();
filterChain.fireExceptionCaught(cause);
}
@@ -677,8 +678,9 @@
}
// Process writes
- if (isWritable(session) && !session.isWriteSuspended()) {
- scheduleFlush(session);
+ if (isWritable(session) && !session.isWriteSuspended() &&
session.setScheduledForFlush(true)) {
+ // add the session to the queue
+ flushingSessions.add(session);
}
}
@@ -697,12 +699,14 @@
if (hasFragmentation) {
while ((ret = read(session, buf)) > 0) {
readBytes += ret;
+
if (!buf.hasRemaining()) {
break;
}
}
} else {
ret = read(session, buf);
+
if (ret > 0) {
readBytes = ret;
}
@@ -731,12 +735,10 @@
} catch (Throwable e) {
if (e instanceof IOException) {
if (!(e instanceof PortUnreachableException)
- || !AbstractDatagramSessionConfig.class
- .isAssignableFrom(config.getClass())
- || ((AbstractDatagramSessionConfig) config)
- .isCloseOnPortUnreachable())
-
+ ||
!AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
+ || ((AbstractDatagramSessionConfig)
config).isCloseOnPortUnreachable()) {
scheduleRemove(session);
+ }
}
IoFilterChain filterChain = session.getFilterChain();
@@ -752,8 +754,12 @@
}
}
+ /**
+ * Write all the pending messages
+ */
private void flush(long currentTime) {
final T firstSession = flushingSessions.peek();
+
if (firstSession == null) {
return;
}
@@ -765,41 +771,44 @@
SessionState state = getState(session);
switch (state) {
- case OPENED:
- try {
- boolean flushedAll = flushNow(session, currentTime);
- if (flushedAll
- && !session.getWriteRequestQueue().isEmpty(session)
- && !session.isScheduledForFlush()) {
- scheduleFlush(session);
+ case OPENED:
+ try {
+ boolean flushedAll = flushNow(session, currentTime);
+
+ if (flushedAll
+ &&
!session.getWriteRequestQueue().isEmpty(session)
+ && !session.isScheduledForFlush()) {
+ scheduleFlush(session);
+ }
+ } catch (Exception e) {
+ scheduleRemove(session);
+ IoFilterChain filterChain = session.getFilterChain();
+ filterChain.fireExceptionCaught(e);
}
- } catch (Exception e) {
- scheduleRemove(session);
- IoFilterChain filterChain = session.getFilterChain();
- filterChain.fireExceptionCaught(e);
- }
-
- break;
-
- case CLOSING:
- // Skip if the channel is already closed.
- break;
-
- case OPENING:
- // Retry later if session is not yet fully initialized.
- // (In case that Session.write() is called before addSession()
- // is processed)
- scheduleFlush(session);
- return;
-
- default:
- throw new IllegalStateException(String.valueOf(state));
+
+ break;
+
+ case CLOSING:
+ // Skip if the channel is already closed.
+ break;
+
+ case OPENING:
+ // Retry later if session is not yet fully initialized.
+ // (In case that Session.write() is called before
addSession()
+ // is processed)
+ scheduleFlush(session);
+ return;
+
+ default:
+ throw new IllegalStateException(String.valueOf(state));
}
session = flushingSessions.peek();
- if (session == null || session == firstSession) {
+
+ if ((session == null) || (session == firstSession)) {
break;
}
+
session = flushingSessions.poll();
}
}
@@ -823,26 +832,32 @@
+ (session.getConfig().getMaxReadBufferSize() >>> 1);
int writtenBytes = 0;
WriteRequest req = null;
+
try {
// Clear OP_WRITE
setInterestedInWrite(session, false);
do {
// Check for pending writes.
req = session.getCurrentWriteRequest();
+
if (req == null) {
req = writeRequestQueue.poll(session);
+
if (req == null) {
break;
}
+
session.setCurrentWriteRequest(req);
}
int localWrittenBytes = 0;
Object message = req.getMessage();
+
if (message instanceof IoBuffer) {
localWrittenBytes = writeBuffer(session, req,
hasFragmentation, maxWrittenBytes - writtenBytes,
currentTime);
+
if (localWrittenBytes > 0
&& ((IoBuffer) message).hasRemaining()) {
// the buffer isn't empty, we re-interest it in writing
@@ -891,6 +906,7 @@
if (req != null) {
req.getFuture().setException(e);
}
+
IoFilterChain filterChain = session.getFilterChain();
filterChain.fireExceptionCaught(e);
return false;
@@ -904,15 +920,19 @@
throws Exception {
IoBuffer buf = (IoBuffer) req.getMessage();
int localWrittenBytes = 0;
+
if (buf.hasRemaining()) {
int length;
+
if (hasFragmentation) {
length = Math.min(buf.remaining(), maxLength);
} else {
length = buf.remaining();
}
+
for (int i = WRITE_SPIN_COUNT; i > 0; i--) {
localWrittenBytes = write(session, buf, length);
+
if (localWrittenBytes != 0) {
break;
}
@@ -934,14 +954,17 @@
throws Exception {
int localWrittenBytes;
FileRegion region = (FileRegion) req.getMessage();
+
if (region.getRemainingBytes() > 0) {
int length;
+
if (hasFragmentation) {
length = (int) Math.min(region.getRemainingBytes(), maxLength);
} else {
length = (int) Math.min(Integer.MAX_VALUE, region
.getRemainingBytes());
}
+
localWrittenBytes = transferFile(session, region, length);
region.update(localWrittenBytes);
} else {
@@ -965,7 +988,7 @@
}
/**
- * Update the trafficControl for all the session which has just been
opened.
+ * Update the trafficControl for all the session.
*/
private void updateTrafficMask() {
int queueSize = trafficControllingSessions.size();
@@ -981,27 +1004,30 @@
SessionState state = getState(session);
switch (state) {
- case OPENED:
- updateTrafficControl(session);
- break;
-
- case CLOSING:
- break;
-
- case OPENING:
- // Retry later if session is not yet fully initialized.
- // (In case that Session.suspend??() or session.resume??() is
- // called before addSession() is processed)
- // We just put back the session at the end of the queue.
- trafficControllingSessions.add(session);
- break;
+ case OPENED:
+ updateTrafficControl(session);
- default:
- throw new IllegalStateException(String.valueOf(state));
+ break;
+
+ case CLOSING:
+ break;
+
+ case OPENING:
+ // Retry later if session is not yet fully initialized.
+ // (In case that Session.suspend??() or session.resume??()
is
+ // called before addSession() is processed)
+ // We just put back the session at the end of the queue.
+ trafficControllingSessions.add(session);
+ break;
+
+ default:
+ throw new IllegalStateException(String.valueOf(state));
}
-
+
// As we have handled one session, decrement the number of
- // remaining sessions.
+ // remaining sessions. The OPENING session will be processed
+ // with the next select(), as the queue size has been decreased,
even
+ // if the session has been pushed at the end of the queue
queueSize--;
}
}
@@ -1028,6 +1054,12 @@
}
}
+ /**
+ * The main loop. This is the place in charge to poll the Selector, and to
+ * process the active sessions. It's done in
+ * - handle the newly created sessions
+ * -
+ */
private class Processor implements Runnable {
public void run() {
int nSessions = 0;
@@ -1056,8 +1088,7 @@
// we can reselect immediately
continue;
} else {
- LOG
- .warn("Create a new selector.
Selected is 0, delta = "
+ LOG.warn("Create a new selector.
Selected is 0, delta = "
+ (t1 - t0));
// Ok, we are hit by the nasty epoll
// spinning.
@@ -1092,21 +1123,29 @@
wakeupCalled.getAndSet(false);
}
+ // Manage newly created session first
nSessions += handleNewSessions();
updateTrafficMask();
// Now, if we have had some incoming or outgoing events,
// deal with them
if (selected > 0) {
- // System.out.println( "Proccessing ...");
+ //LOG.debug("Processing ..."); // This log hurts one
of the MDCFilter test...
process();
}
+ // Write the pending requests
long currentTime = System.currentTimeMillis();
flush(currentTime);
+
+ // And manage removed sessions
nSessions -= removeSessions();
+
+ // Last, not least, send Idle events to the idle sessions
notifyIdleSessions(currentTime);
+ // Get a chance to exit the infinite loop if there are no
+ // more sessions on this Processor
if (nSessions == 0) {
synchronized (lock) {
if (newSessions.isEmpty() && isSelectorEmpty()) {
@@ -1122,6 +1161,7 @@
for (Iterator<T> i = allSessions(); i.hasNext();) {
scheduleRemove(i.next());
}
+
wakeup();
}
} catch (Throwable t) {
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java?rev=899977&r1=899976&r2=899977&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
Sat Jan 16 15:27:24 2010
@@ -53,6 +53,7 @@
*/
public NioProcessor(Executor executor) {
super(executor);
+
try {
// Open a new selector
selector = Selector.open();