Author: trustin
Date: Sun Mar 16 23:39:20 2008
New Revision: 637741

URL: http://svn.apache.org/viewvc?rev=637741&view=rev
Log:
Fixed issue: DIRMINA-552 - writtenBytes and lastWriteTime are not updated 
immediately.
* Modified every flush operation to call increaseWrittenBytes when it returns - 
seems to work fine


Modified:
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
    
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java
    
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoProcessor.java
    
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java?rev=637741&r1=637740&r2=637741&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java 
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java 
Sun Mar 16 23:39:20 2008
@@ -530,29 +530,7 @@
         }
     }
 
-    protected final void increaseWrittenBytesAndMessages(
-            WriteRequest request, long currentTime) {
-        
-        Object message = request.getMessage();
-        if (message instanceof IoBuffer) {
-            IoBuffer b = (IoBuffer) message;
-            if (b.hasRemaining()) {
-                increaseWrittenBytes(((IoBuffer) message).remaining(), 
currentTime);
-            } else {
-                increaseWrittenMessages(currentTime);
-            }
-        } else if (message instanceof FileRegion) {
-            FileRegion region = (FileRegion) message;
-            if (region.getRemainingBytes() == 0) {
-                increaseWrittenBytes(region.getWrittenBytes(), currentTime);
-                increaseWrittenMessages(currentTime);
-            }
-        } else {
-            increaseWrittenMessages(currentTime);
-        }
-    }
-    
-    private void increaseWrittenBytes(long increment, long currentTime) {
+    protected final void increaseWrittenBytes(long increment, long 
currentTime) {
         if (increment <= 0) {
             return;
         }
@@ -569,7 +547,16 @@
         increaseScheduledWriteBytes(-increment);
     }
 
-    private void increaseWrittenMessages(long currentTime) {
+    protected final void increaseWrittenMessages(
+            WriteRequest request, long currentTime) {
+        Object message = request.getMessage();
+        if (message instanceof IoBuffer) {
+            IoBuffer b = (IoBuffer) message;
+            if (b.hasRemaining()) {
+                return;
+            }
+        }
+
         writtenMessages++;
         lastWriteTime = currentTime;
         if (getService() instanceof AbstractIoService) {

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java?rev=637741&r1=637740&r2=637741&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java
 Sun Mar 16 23:39:20 2008
@@ -303,10 +303,11 @@
                         processReadySessions(selectedHandles());
                     }
 
-                    flushSessions();
+                    long currentTime = System.currentTimeMillis();
+                    flushSessions(currentTime);
                     nHandles -= unregisterHandles();
 
-                    notifyIdleSessions();
+                    notifyIdleSessions(currentTime);
 
                     if (nHandles == 0) {
                         synchronized (lock) {
@@ -379,7 +380,7 @@
         }
     }
 
-    private void flushSessions() {
+    private void flushSessions(long currentTime) {
         for (; ;) {
             T session = flushingSessions.poll();
             if (session == null) {
@@ -389,7 +390,7 @@
             session.setScheduledForFlush(false);
 
             try {
-                boolean flushedAll = flush(session);
+                boolean flushedAll = flush(session, currentTime);
                 if (flushedAll && 
!session.getWriteRequestQueue().isEmpty(session) &&
                     !session.isScheduledForFlush()) {
                     scheduleFlush(session);
@@ -400,55 +401,58 @@
         }
     }
 
-    private boolean flush(T session) throws Exception {
+    private boolean flush(T session, long currentTime) throws Exception {
         // Clear OP_WRITE
         setInterestedInWrite(session, false);
         
-        WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
-
-        int maxWrittenBytes =
+        final WriteRequestQueue writeRequestQueue = 
session.getWriteRequestQueue();
+        final int maxWrittenBytes =
             session.getConfig().getMaxReadBufferSize() +
             (session.getConfig().getMaxReadBufferSize() >>> 1);
 
         int writtenBytes = 0;
-        for (; ;) {
-            WriteRequest req = session.getCurrentWriteRequest();
-            if (req == null) {
-                req = writeRequestQueue.poll(session);
+        try {
+            for (; ;) {
+                WriteRequest req = session.getCurrentWriteRequest();
                 if (req == null) {
-                    break;
+                    req = writeRequestQueue.poll(session);
+                    if (req == null) {
+                        break;
+                    }
+                    session.setCurrentWriteRequest(req);
+                }
+    
+                IoBuffer buf = (IoBuffer) req.getMessage();
+                if (buf.remaining() == 0) {
+                    // Clear and fire event
+                    session.setCurrentWriteRequest(null);
+                    buf.reset();
+                    session.getFilterChain().fireMessageSent(req);
+                    continue;
+                }
+    
+                SocketAddress destination = req.getDestination();
+                if (destination == null) {
+                    destination = session.getRemoteAddress();
+                }
+    
+                int localWrittenBytes = send(session, buf, destination);
+                if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) 
{
+                    // Kernel buffer is full or wrote too much
+                    setInterestedInWrite(session, true);
+                    return false;
+                } else {
+                    setInterestedInWrite(session, false);
+    
+                    // Clear and fire event
+                    session.setCurrentWriteRequest(null);
+                    writtenBytes += localWrittenBytes;
+                    buf.reset();
+                    session.getFilterChain().fireMessageSent(req);
                 }
-                session.setCurrentWriteRequest(req);
-            }
-
-            IoBuffer buf = (IoBuffer) req.getMessage();
-            if (buf.remaining() == 0) {
-                // Clear and fire event
-                session.setCurrentWriteRequest(null);
-                buf.reset();
-                session.getFilterChain().fireMessageSent(req);
-                continue;
-            }
-
-            SocketAddress destination = req.getDestination();
-            if (destination == null) {
-                destination = session.getRemoteAddress();
-            }
-
-            int localWrittenBytes = send(session, buf, destination);
-            if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
-                // Kernel buffer is full or wrote too much
-                setInterestedInWrite(session, true);
-                return false;
-            } else {
-                setInterestedInWrite(session, false);
-
-                // Clear and fire event
-                session.setCurrentWriteRequest(null);
-                writtenBytes += localWrittenBytes;
-                buf.reset();
-                session.getFilterChain().fireMessageSent(req);
             }
+        } finally {
+            session.increaseWrittenBytes(writtenBytes, currentTime);
         }
 
         return true;
@@ -524,9 +528,8 @@
         return nHandles;
     }
 
-    private void notifyIdleSessions() {
+    private void notifyIdleSessions(long currentTime) {
         // process idle sessions
-        long currentTime = System.currentTimeMillis();
         if (currentTime - lastIdleCheckTime >= 1000) {
             lastIdleCheckTime = currentTime;
             IdleStatusChecker.notifyIdleness(

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoProcessor.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoProcessor.java?rev=637741&r1=637740&r2=637741&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoProcessor.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoProcessor.java
 Sun Mar 16 23:39:20 2008
@@ -203,7 +203,7 @@
         if (Thread.currentThread() == workerThread) {
             // Bypass the queue if called from the worker thread itself
             // (i.e. single thread model).
-            flushNow(session);
+            flushNow(session, System.currentTimeMillis());
         } else {
             boolean needsWakeup = flushingSessions.isEmpty();
             if (scheduleFlush(session) && needsWakeup) {
@@ -451,16 +451,15 @@
         }
     }
 
-    private void notifyIdleSessions() throws Exception {
+    private void notifyIdleSessions(long currentTime) throws Exception {
         // process idle sessions
-        long currentTime = System.currentTimeMillis();
         if (currentTime - lastIdleCheckTime >= 1000) {
             lastIdleCheckTime = currentTime;
             IdleStatusChecker.notifyIdleness(allSessions(), currentTime);
         }
     }
 
-    private void flush() {
+    private void flush(long currentTime) {
         for (; ;) {
             T session = flushingSessions.poll();
 
@@ -473,7 +472,7 @@
             switch (state) {
             case OPEN:
                 try {
-                    boolean flushedAll = flushNow(session);
+                    boolean flushedAll = flushNow(session, currentTime);
                     if (flushedAll && 
!session.getWriteRequestQueue().isEmpty(session) &&
                         !session.isScheduledForFlush()) {
                         scheduleFlush(session);
@@ -497,7 +496,7 @@
         }
     }
 
-    private boolean flushNow(T session) {
+    private boolean flushNow(T session, long currentTime) {
         if (!session.isConnected()) {
             scheduleRemove(session);
             return false;
@@ -506,18 +505,17 @@
         final boolean hasFragmentation = 
             session.getTransportMetadata().hasFragmentation();
 
+        final WriteRequestQueue writeRequestQueue = 
session.getWriteRequestQueue();
+        
+        // Set limitation for the number of written bytes for read-write
+        // fairness.  I used maxReadBufferSize * 3 / 2, which yields best
+        // performance in my experience while not breaking fairness much.
+        final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize() 
+
+                              (session.getConfig().getMaxReadBufferSize() >>> 
1);
+        int writtenBytes = 0;
         try {
             // Clear OP_WRITE
             setInterestedInWrite(session, false);
-    
-            WriteRequestQueue writeRequestQueue = 
session.getWriteRequestQueue();
-    
-            // Set limitation for the number of written bytes for read-write
-            // fairness.  I used maxReadBufferSize * 3 / 2, which yields best
-            // performance in my experience while not breaking fairness much.
-            int maxWrittenBytes = session.getConfig().getMaxReadBufferSize() +
-                                  (session.getConfig().getMaxReadBufferSize() 
>>> 1);
-            int writtenBytes = 0;
             do {
                 // Check for pending writes.
                 WriteRequest req = session.getCurrentWriteRequest();
@@ -540,7 +538,7 @@
                             session, req, hasFragmentation,
                             maxWrittenBytes - writtenBytes);
                 } else {
-                       throw new IllegalStateException("Don't know how to 
handle message of type '" + message.getClass().getName() + "'.  Are you missing 
a protocol encoder?");
+                    throw new IllegalStateException("Don't know how to handle 
message of type '" + message.getClass().getName() + "'.  Are you missing a 
protocol encoder?");
                 }
                 
                 writtenBytes += localWrittenBytes;
@@ -554,6 +552,8 @@
         } catch (Exception e) {
             session.getFilterChain().fireExceptionCaught(e);
             return false;
+        } finally {
+            session.increaseWrittenBytes(writtenBytes, currentTime);
         }
 
         return true;
@@ -686,9 +686,10 @@
                         process();
                     }
 
-                    flush();
+                    long currentTime = System.currentTimeMillis();
+                    flush(currentTime);
                     nSessions -= remove();
-                    notifyIdleSessions();
+                    notifyIdleSessions(currentTime);
 
                     if (nSessions == 0) {
                         synchronized (lock) {

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java?rev=637741&r1=637740&r2=637741&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java 
(original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java 
Sun Mar 16 23:39:20 2008
@@ -410,7 +410,7 @@
     }
 
     public void fireMessageSent(WriteRequest request) {
-        session.increaseWrittenBytesAndMessages(request, 
System.currentTimeMillis());
+        session.increaseWrittenMessages(request, System.currentTimeMillis());
 
         try {
             request.getFuture().setWritten();
@@ -421,7 +421,7 @@
         Entry head = this.head;
         callNextMessageSent(head, session, request);
     }
-
+    
     private void callNextMessageSent(Entry entry, IoSession session,
             WriteRequest writeRequest) {
         try {


Reply via email to