Author: elecharny
Date: Sat Jan 16 15:27:24 2010
New Revision: 899977

URL: http://svn.apache.org/viewvc?rev=899977&view=rev
Log:
o Javadoc addition
o Removed some useless method (scheduleTrafficControl)
o Get rid of most of the for(;;) construction, when not necessary
o Added some comment
o Refactoring to make the code more candy to the eye

Modified:
    
mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
    
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java?rev=899977&r1=899976&r2=899977&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
 Sat Jan 16 15:27:24 2010
@@ -35,9 +35,11 @@
 import org.apache.mina.core.buffer.IoBuffer;
 import org.apache.mina.core.file.FileRegion;
 import org.apache.mina.core.filterchain.IoFilterChain;
+import org.apache.mina.core.filterchain.IoFilterChainBuilder;
 import org.apache.mina.core.future.DefaultIoFuture;
 import org.apache.mina.core.service.AbstractIoService;
 import org.apache.mina.core.service.IoProcessor;
+import org.apache.mina.core.service.IoServiceListenerSupport;
 import org.apache.mina.core.session.AbstractIoSession;
 import org.apache.mina.core.session.IoSession;
 import org.apache.mina.core.session.IoSessionConfig;
@@ -62,8 +64,7 @@
 public abstract class AbstractPollingIoProcessor<T extends AbstractIoSession>
         implements IoProcessor<T> {
     /** A logger for this class */
-    private final static Logger LOG = LoggerFactory
-            .getLogger(IoProcessor.class);
+    private final static Logger LOG = 
LoggerFactory.getLogger(IoProcessor.class);
 
     /**
      * The maximum loop count for a write operation until
@@ -82,10 +83,13 @@
     /** A map containing the last Thread ID for each class */
     private static final Map<Class<?>, AtomicInteger> threadIds = new 
HashMap<Class<?>, AtomicInteger>();
 
+    /** A lock used to protect the processor creation */
     private final Object lock = new Object();
 
+    /** This IoProcessor instance name */
     private final String threadName;
 
+    /** The executor to use when we need to start the inner Processor */
     private final Executor executor;
 
     /** A Session queue containing the newly created sessions */
@@ -329,10 +333,8 @@
     /**
      * Initialize the polling of a session. Add it to the polling process.
      * 
-     * @param session
-     *            the {...@link IoSession} to add to the polling
-     * @throws Exception
-     *             any exception thrown by the underlying system calls
+     * @param session the {...@link IoSession} to add to the polling
+     * @throws Exception any exception thrown by the underlying system calls
      */
     protected abstract void init(T session) throws Exception;
 
@@ -426,30 +428,25 @@
      * {...@inheritdoc}
      */
     public final void flush(T session) {
-        if (scheduleFlush(session)) {
+        if (session.setScheduledForFlush(true)) {
+            flushingSessions.add(session);
             wakeup();
         }
     }
 
-    private boolean scheduleFlush(T session) {
+    private void scheduleFlush(T session) {
         if (session.setScheduledForFlush(true)) {
             // add the session to the queue
             flushingSessions.add(session);
-            return true;
         }
-        return false;
     }
 
     /**
      * {...@inheritdoc}
      */
     public final void updateTrafficMask(T session) {
-        scheduleTrafficControl(session);
-        wakeup();
-    }
-
-    private void scheduleTrafficControl(T session) {
         trafficControllingSessions.add(session);
+        wakeup();
     }
 
     /**
@@ -460,8 +457,7 @@
         synchronized (lock) {
             if (processor == null) {
                 processor = new Processor();
-                executor.execute(new NamePreservingRunnable(processor,
-                        threadName));
+                executor.execute(new NamePreservingRunnable(processor, 
threadName));
             }
         }
 
@@ -500,14 +496,7 @@
     private int handleNewSessions() {
         int addedSessions = 0;
 
-        for (;;) {
-            T session = newSessions.poll();
-
-            if (session == null) {
-                // All new sessions have been handled
-                break;
-            }
-
+        for (T session = newSessions.poll(); session != null; session = 
newSessions.poll()) {
             if (addNow(session)) {
                 // A new session has been created
                 addedSessions++;
@@ -517,6 +506,15 @@
         return addedSessions;
     }
 
+    /**
+     * Process a new session :
+     * - initialize it
+     * - create its chain
+     * - fire the CREATED listeners if any
+     *
+     * @param session The session to create
+     * @return true if the session has been registered
+     */
     private boolean addNow(T session) {
         boolean registered = false;
         boolean notified = false;
@@ -526,13 +524,15 @@
             registered = true;
 
             // Build the filter chain of this session.
-            session.getService().getFilterChainBuilder().buildFilterChain(
-                    session.getFilterChain());
+            IoFilterChainBuilder chainBuilder = 
session.getService().getFilterChainBuilder();
+            chainBuilder.buildFilterChain(session.getFilterChain());
 
             // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
             // in AbstractIoFilterChain.fireSessionOpened().
-            ((AbstractIoService) session.getService()).getListeners()
-                    .fireSessionCreated(session);
+            // Propagate the SESSION_CREATED event up to the chain
+            IoServiceListenerSupport listeners = ((AbstractIoService) 
session.getService()).getListeners();
+            listeners.fireSessionCreated(session);
+            
             notified = true;
         } catch (Throwable e) {
             if (notified) {
@@ -544,6 +544,7 @@
                 wakeup();
             } else {
                 ExceptionMonitor.getInstance().exceptionCaught(e);
+                
                 try {
                     destroy(session);
                 } catch (Exception e1) {
@@ -553,50 +554,47 @@
                 }
             }
         }
+        
         return registered;
     }
 
     private int removeSessions() {
         int removedSessions = 0;
 
-        for (;;) {
-            T session = removingSessions.poll();
-
-            if (session == null) {
-                // No session to remove. Get out.
-                return removedSessions;
-            }
-
+        for (T session = removingSessions.poll();session != null;session = 
removingSessions.poll()) {
             SessionState state = getState(session);
 
             // Now deal with the removal accordingly to the session's state
             switch (state) {
-            case OPENED:
-                // Try to remove this session
-                if (removeNow(session)) {
-                    removedSessions++;
-                }
-
-                break;
-
-            case CLOSING:
-                // Skip if channel is already closed
-                break;
-
-            case OPENING:
-                // Remove session from the newSessions queue and
-                // remove it
-                newSessions.remove(session);
-
-                if (removeNow(session)) {
-                    removedSessions++;
-                }
-                break;
-
-            default:
-                throw new IllegalStateException(String.valueOf(state));
+                case OPENED:
+                    // Try to remove this session
+                    if (removeNow(session)) {
+                        removedSessions++;
+                    }
+    
+                    break;
+    
+                case CLOSING:
+                    // Skip if channel is already closed
+                    break;
+    
+                case OPENING:
+                    // Remove session from the newSessions queue and
+                    // remove it
+                    newSessions.remove(session);
+    
+                    if (removeNow(session)) {
+                        removedSessions++;
+                    }
+                    
+                    break;
+    
+                default:
+                    throw new IllegalStateException(String.valueOf(state));
             }
         }
+        
+        return removedSessions;
     }
 
     private boolean removeNow(T session) {
@@ -623,9 +621,10 @@
         List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
 
         if ((req = writeRequestQueue.poll(session)) != null) {
-            Object m = req.getMessage();
-            if (m instanceof IoBuffer) {
-                IoBuffer buf = (IoBuffer) req.getMessage();
+            Object message = req.getMessage();
+            
+            if (message instanceof IoBuffer) {
+                IoBuffer buf = (IoBuffer)message;
 
                 // The first unwritten empty buffer must be
                 // forwarded to the filter chain.
@@ -650,10 +649,12 @@
         if (!failedRequests.isEmpty()) {
             WriteToClosedSessionException cause = new 
WriteToClosedSessionException(
                     failedRequests);
+            
             for (WriteRequest r : failedRequests) {
                 session.decreaseScheduledBytesAndMessages(r);
                 r.getFuture().setException(cause);
             }
+            
             IoFilterChain filterChain = session.getFilterChain();
             filterChain.fireExceptionCaught(cause);
         }
@@ -677,8 +678,9 @@
         }
 
         // Process writes
-        if (isWritable(session) && !session.isWriteSuspended()) {
-            scheduleFlush(session);
+        if (isWritable(session) && !session.isWriteSuspended() && 
session.setScheduledForFlush(true)) {
+            // add the session to the queue
+            flushingSessions.add(session);
         }
     }
 
@@ -697,12 +699,14 @@
                 if (hasFragmentation) {
                     while ((ret = read(session, buf)) > 0) {
                         readBytes += ret;
+                        
                         if (!buf.hasRemaining()) {
                             break;
                         }
                     }
                 } else {
                     ret = read(session, buf);
+                    
                     if (ret > 0) {
                         readBytes = ret;
                     }
@@ -731,12 +735,10 @@
         } catch (Throwable e) {
             if (e instanceof IOException) {
                 if (!(e instanceof PortUnreachableException)
-                        || !AbstractDatagramSessionConfig.class
-                                .isAssignableFrom(config.getClass())
-                        || ((AbstractDatagramSessionConfig) config)
-                                .isCloseOnPortUnreachable())
-
+                        || 
!AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
+                        || ((AbstractDatagramSessionConfig) 
config).isCloseOnPortUnreachable()) {
                     scheduleRemove(session);
+                }
             }
 
             IoFilterChain filterChain = session.getFilterChain();
@@ -752,8 +754,12 @@
         }
     }
 
+    /**
+     * Write all the pending messages
+     */
     private void flush(long currentTime) {
         final T firstSession = flushingSessions.peek();
+        
         if (firstSession == null) {
             return;
         }
@@ -765,41 +771,44 @@
             SessionState state = getState(session);
 
             switch (state) {
-            case OPENED:
-                try {
-                    boolean flushedAll = flushNow(session, currentTime);
-                    if (flushedAll
-                            && !session.getWriteRequestQueue().isEmpty(session)
-                            && !session.isScheduledForFlush()) {
-                        scheduleFlush(session);
+                case OPENED:
+                    try {
+                        boolean flushedAll = flushNow(session, currentTime);
+                        
+                        if (flushedAll
+                                && 
!session.getWriteRequestQueue().isEmpty(session)
+                                && !session.isScheduledForFlush()) {
+                            scheduleFlush(session);
+                        }
+                    } catch (Exception e) {
+                        scheduleRemove(session);
+                        IoFilterChain filterChain = session.getFilterChain();
+                        filterChain.fireExceptionCaught(e);
                     }
-                } catch (Exception e) {
-                    scheduleRemove(session);
-                    IoFilterChain filterChain = session.getFilterChain();
-                    filterChain.fireExceptionCaught(e);
-                }
-
-                break;
-
-            case CLOSING:
-                // Skip if the channel is already closed.
-                break;
-
-            case OPENING:
-                // Retry later if session is not yet fully initialized.
-                // (In case that Session.write() is called before addSession()
-                // is processed)
-                scheduleFlush(session);
-                return;
-
-            default:
-                throw new IllegalStateException(String.valueOf(state));
+    
+                    break;
+    
+                case CLOSING:
+                    // Skip if the channel is already closed.
+                    break;
+    
+                case OPENING:
+                    // Retry later if session is not yet fully initialized.
+                    // (In case that Session.write() is called before 
addSession()
+                    // is processed)
+                    scheduleFlush(session);
+                    return;
+    
+                default:
+                    throw new IllegalStateException(String.valueOf(state));
             }
 
             session = flushingSessions.peek();
-            if (session == null || session == firstSession) {
+            
+            if ((session == null) || (session == firstSession)) {
                 break;
             }
+            
             session = flushingSessions.poll();
         }
     }
@@ -823,26 +832,32 @@
                 + (session.getConfig().getMaxReadBufferSize() >>> 1);
         int writtenBytes = 0;
         WriteRequest req = null;
+        
         try {
             // Clear OP_WRITE
             setInterestedInWrite(session, false);
             do {
                 // Check for pending writes.
                 req = session.getCurrentWriteRequest();
+                
                 if (req == null) {
                     req = writeRequestQueue.poll(session);
+                    
                     if (req == null) {
                         break;
                     }
+                    
                     session.setCurrentWriteRequest(req);
                 }
 
                 int localWrittenBytes = 0;
                 Object message = req.getMessage();
+                
                 if (message instanceof IoBuffer) {
                     localWrittenBytes = writeBuffer(session, req,
                             hasFragmentation, maxWrittenBytes - writtenBytes,
                             currentTime);
+                    
                     if (localWrittenBytes > 0
                             && ((IoBuffer) message).hasRemaining()) {
                         // the buffer isn't empty, we re-interest it in writing
@@ -891,6 +906,7 @@
             if (req != null) {
                 req.getFuture().setException(e);
             }
+            
             IoFilterChain filterChain = session.getFilterChain();
             filterChain.fireExceptionCaught(e);
             return false;
@@ -904,15 +920,19 @@
             throws Exception {
         IoBuffer buf = (IoBuffer) req.getMessage();
         int localWrittenBytes = 0;
+        
         if (buf.hasRemaining()) {
             int length;
+            
             if (hasFragmentation) {
                 length = Math.min(buf.remaining(), maxLength);
             } else {
                 length = buf.remaining();
             }
+            
             for (int i = WRITE_SPIN_COUNT; i > 0; i--) {
                 localWrittenBytes = write(session, buf, length);
+                
                 if (localWrittenBytes != 0) {
                     break;
                 }
@@ -934,14 +954,17 @@
             throws Exception {
         int localWrittenBytes;
         FileRegion region = (FileRegion) req.getMessage();
+        
         if (region.getRemainingBytes() > 0) {
             int length;
+            
             if (hasFragmentation) {
                 length = (int) Math.min(region.getRemainingBytes(), maxLength);
             } else {
                 length = (int) Math.min(Integer.MAX_VALUE, region
                         .getRemainingBytes());
             }
+            
             localWrittenBytes = transferFile(session, region, length);
             region.update(localWrittenBytes);
         } else {
@@ -965,7 +988,7 @@
     }
 
     /**
-     * Update the trafficControl for all the session which has just been 
opened.
+     * Update the trafficControl for all the session.
      */
     private void updateTrafficMask() {
         int queueSize = trafficControllingSessions.size();
@@ -981,27 +1004,30 @@
             SessionState state = getState(session);
 
             switch (state) {
-            case OPENED:
-                updateTrafficControl(session);
-                break;
-
-            case CLOSING:
-                break;
-
-            case OPENING:
-                // Retry later if session is not yet fully initialized.
-                // (In case that Session.suspend??() or session.resume??() is
-                // called before addSession() is processed)
-                // We just put back the session at the end of the queue.
-                trafficControllingSessions.add(session);
-                break;
+                case OPENED:
+                    updateTrafficControl(session);
 
-            default:
-                throw new IllegalStateException(String.valueOf(state));
+                    break;
+    
+                case CLOSING:
+                    break;
+    
+                case OPENING:
+                    // Retry later if session is not yet fully initialized.
+                    // (In case that Session.suspend??() or session.resume??() 
is
+                    // called before addSession() is processed)
+                    // We just put back the session at the end of the queue.
+                    trafficControllingSessions.add(session);
+                    break;
+    
+                default:
+                    throw new IllegalStateException(String.valueOf(state));
             }
-
+            
             // As we have handled one session, decrement the number of
-            // remaining sessions.
+            // remaining sessions. The OPENING session will be processed
+            // with the next select(), as the queue size has been decreased, 
even 
+            // if the session has been pushed at the end of the queue
             queueSize--;
         }
     }
@@ -1028,6 +1054,12 @@
         }
     }
 
+    /**
+     * The main loop. This is the place in charge to poll the Selector, and to 
+     * process the active sessions. It's done in 
+     * - handle the newly created sessions
+     * - 
+     */
     private class Processor implements Runnable {
         public void run() {
             int nSessions = 0;
@@ -1056,8 +1088,7 @@
                                         // we can reselect immediately
                                         continue;
                                     } else {
-                                        LOG
-                                                .warn("Create a new selector. 
Selected is 0, delta = "
+                                        LOG.warn("Create a new selector. 
Selected is 0, delta = "
                                                         + (t1 - t0));
                                         // Ok, we are hit by the nasty epoll
                                         // spinning.
@@ -1092,21 +1123,29 @@
                         wakeupCalled.getAndSet(false);
                     }
 
+                    // Manage newly created session first
                     nSessions += handleNewSessions();
                     updateTrafficMask();
 
                     // Now, if we have had some incoming or outgoing events,
                     // deal with them
                     if (selected > 0) {
-                        // System.out.println( "Proccessing ...");
+                        //LOG.debug("Processing ..."); // This log hurts one 
of the MDCFilter test...
                         process();
                     }
 
+                    // Write the pending requests
                     long currentTime = System.currentTimeMillis();
                     flush(currentTime);
+                    
+                    // And manage removed sessions
                     nSessions -= removeSessions();
+                    
+                    // Last, not least, send Idle events to the idle sessions
                     notifyIdleSessions(currentTime);
 
+                    // Get a chance to exit the infinite loop if there are no
+                    // more sessions on this Processor
                     if (nSessions == 0) {
                         synchronized (lock) {
                             if (newSessions.isEmpty() && isSelectorEmpty()) {
@@ -1122,6 +1161,7 @@
                         for (Iterator<T> i = allSessions(); i.hasNext();) {
                             scheduleRemove(i.next());
                         }
+                        
                         wakeup();
                     }
                 } catch (Throwable t) {

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java?rev=899977&r1=899976&r2=899977&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
 Sat Jan 16 15:27:24 2010
@@ -53,6 +53,7 @@
      */
     public NioProcessor(Executor executor) {
         super(executor);
+        
         try {
             // Open a new selector
             selector = Selector.open();


Reply via email to