Author: jvermillard
Date: Thu Jul 26 07:33:15 2007
New Revision: 559833

URL: http://svn.apache.org/viewvc?view=rev&rev=559833
Log:
less write synchronize, and call Poll.add for writing in the filterchain, for 
better latency (was previously done in the APRioP Worker thread)

Modified:
    
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java

Modified: 
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java?view=diff&rev=559833&r1=559832&r2=559833
==============================================================================
--- 
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java
 (original)
+++ 
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java
 Thu Jul 26 07:33:15 2007
@@ -40,8 +40,6 @@
 
        private final Queue<APRSessionImpl> removingSessions = new 
ConcurrentLinkedQueue<APRSessionImpl>();
 
-       private final Queue<APRSessionImpl> flushingSessions = new 
ConcurrentLinkedQueue<APRSessionImpl>();
-
        private final Queue<APRSessionImpl> trafficControllingSessions = new 
ConcurrentLinkedQueue<APRSessionImpl>();
 
        private final Map<Long, APRSessionImpl> managedSessions = new 
HashMap<Long, APRSessionImpl>();
@@ -61,9 +59,7 @@
                try {
 
                        // TODO : optimize/parametrize those values
-                       synchronized (this) {
-                               pollset = Poll.create(32, pool, /* obviously 
doesn't work..*/ Poll.APR_POLLSET_THREADSAFE, 10000000);   
-                       }
+                       pollset = Poll.create(32, pool, /* obviously doesn't 
work..*/ Poll.APR_POLLSET_THREADSAFE, 10000000);   
                        
                } catch (Error e) {
                        logger.error("APR Error : " + e.getDescription(), e);
@@ -92,18 +88,14 @@
        }
 
        void flush(APRSessionImpl session) {
-               scheduleFlush(session);
+               // re-add the session to polling with POLLOUT flag 
+               pollOutSession(session);
        }
     
        private void scheduleRemove(APRSessionImpl session) {
                removingSessions.offer(session);
        }
 
-       private void scheduleFlush(APRSessionImpl session) {
-               flushingSessions.offer(session);
-       }
-
-       
        // TODO : do something with traffic control 
        private void scheduleTrafficControl(APRSessionImpl session) {
                trafficControllingSessions.offer(session);
@@ -122,11 +114,7 @@
                        System.err.println("pollset : "+pollset);
                        System.err.println("Socket : "+session.getAPRSocket());
                        int rv;
-                       synchronized (this) {
-                               rv = Poll.add(pollset, session.getAPRSocket(), 
Poll.APR_POLLIN
-               
-                               /*| Poll.APR_POLLOUT*/);
-                       }
+                               rv = Poll.add(pollset, session.getAPRSocket(), 
Poll.APR_POLLIN/*| Poll.APR_POLLOUT*/);
                        if (rv == Status.APR_SUCCESS) {
                                System.out.println("Added worker to pollset");
                                managedSessions.put(session.getAPRSocket(), 
session);
@@ -151,9 +139,7 @@
                        }
 
                        // remove of the pollset
-                       synchronized (this) {
-                               Poll.remove(pollset, session.getAPRSocket());   
-                       }
+                       Poll.remove(pollset, session.getAPRSocket());   
                        
                        // close the socket
                        Socket.close(session.getAPRSocket());
@@ -162,43 +148,19 @@
                }
        }
        
-    private void doFlush() {
-        if (flushingSessions.size() == 0) {
-            return;
-        }
-
-        for (;;) {
-            APRSessionImpl session = flushingSessions.poll();
-
-            if (session == null) {
-                break;
-            }
-
-            if (!session.isConnected()) {
-                clearWriteRequestQueue(session);
-                continue;
-            }
-            
-            // add the descriptor as POLLOUT
-               synchronized (this) {
-                               int rv= Poll.remove(pollset, 
session.getAPRSocket());
-                               if(rv!=Status.APR_SUCCESS) {
-                                       System.err.println("poll.remove Error : 
"+Error.strerror(rv));
-                               }
-                               rv = Poll.add(pollset, 
session.getAPRSocket(),Poll.APR_POLLIN | Poll.APR_POLLOUT);
-                               if (rv == Status.APR_SUCCESS) {
-                                       // ok
-                               } else {
-                                       System.err.println("poll.add Error : 
"+Error.strerror(rv));
-                               }
-
-               }
-               
-            
-        }
+    private void pollOutSession(APRSessionImpl session) {
+           int rv= Poll.remove(pollset, session.getAPRSocket());
+               if(rv!=Status.APR_SUCCESS) {
+                       System.err.println("poll.remove Error : 
"+Error.strerror(rv));
+               }
+               rv = Poll.add(pollset, session.getAPRSocket(),Poll.APR_POLLIN | 
Poll.APR_POLLOUT);
+               if (rv == Status.APR_SUCCESS) {
+                       // ok
+               } else {
+                       System.err.println("poll.add Error : 
"+Error.strerror(rv));
+               }
     }
 
-
        private void read(APRSessionImpl session) {
                byte[] buf = session.getReadBuffer();
                // FIXME : hardcoded read value for testing
@@ -363,12 +325,7 @@
                                        long[] desc = new long[socketCount * 2];
 
                                        /* use 100 milliseconds poll timeout, 
TODO : parametrize for more latency/CPU usage control*/
-                                       int rv;
-                                       synchronized (this) {
-                                               rv = Poll.poll(pollset, 100000, 
desc, false);
-                                       }
-                                       //System.err.println("rv poll : "+rv+" 
- "+Thread.currentThread());
-                                       
+                                       int rv = Poll.poll(pollset, 100000, 
desc, false);
                                        if (rv > 0) {
                                                for (int n = 0; n < rv; n++) {
                                                        long clientSock = 
desc[n * 2 + 1];
@@ -388,7 +345,7 @@
                                                                write(session);
                                                }
                                        }
-                                       doFlush();
+                               //      doFlush();
                                        notifyIdleness();
                                        doRemove();
                                } catch (Throwable t) {


Reply via email to