Author: jvermillard
Date: Thu Jul 26 07:24:09 2007
New Revision: 559826

URL: http://svn.apache.org/viewvc?view=rev&rev=559826
Log:
write works now, without big optimization (lot of useless synchronize)

Modified:
    
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRFilterChain.java
    
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java
    
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java
    
mina/sandbox/jvermillard/apr/src/test/java/org/apache/mina/transport/apr/TestCnx.java

Modified: 
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRFilterChain.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRFilterChain.java?view=diff&rev=559826&r1=559825&r2=559826
==============================================================================
--- 
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRFilterChain.java
 (original)
+++ 
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRFilterChain.java
 Thu Jul 26 07:24:09 2007
@@ -16,7 +16,6 @@
 
        @Override
        protected void doWrite(IoSession session, WriteRequest writeRequest) {
-               System.err.println("dowrite ????");
                APRSessionImpl s = (APRSessionImpl) session;
                Queue<WriteRequest> writeRequestQueue = 
s.getWriteRequestQueue();
 

Modified: 
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java?view=diff&rev=559826&r1=559825&r2=559826
==============================================================================
--- 
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java
 (original)
+++ 
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java
 Thu Jul 26 07:24:09 2007
@@ -61,7 +61,10 @@
                try {
 
                        // TODO : optimize/parametrize those values
-                       pollset = Poll.create(32, pool, 
Poll.APR_POLLSET_THREADSAFE, 10000000);
+                       synchronized (this) {
+                               pollset = Poll.create(32, pool, /* obviously 
doesn't work..*/ Poll.APR_POLLSET_THREADSAFE, 10000000);   
+                       }
+                       
                } catch (Error e) {
                        logger.error("APR Error : " + e.getDescription(), e);
                        // TODO : send that to the good logger
@@ -89,27 +92,16 @@
        }
 
        void flush(APRSessionImpl session) {
-               //scheduleFlush(session);
-
-               // add the descriptor as POLLOUT
-               
-               int rv = Poll.add(pollset, session.getAPRSocket(), 
Poll.APR_POLLIN
-               | Poll.APR_POLLOUT);
-               if (rv == Status.APR_SUCCESS) {
-                       System.err.println("pollout Ok");
-               } else {
-                       System.err.println("");
-               }
-
+               scheduleFlush(session);
        }
-
+    
        private void scheduleRemove(APRSessionImpl session) {
                removingSessions.offer(session);
        }
 
-//     private void scheduleFlush(APRSessionImpl session) {
-//             flushingSessions.offer(session);
-//     }
+       private void scheduleFlush(APRSessionImpl session) {
+               flushingSessions.offer(session);
+       }
 
        
        // TODO : do something with traffic control 
@@ -129,8 +121,12 @@
                        // FIXME : perhaps we should oll write only if needed 
for save CPU, but actually it's too complex for me :)
                        System.err.println("pollset : "+pollset);
                        System.err.println("Socket : "+session.getAPRSocket());
-                       int rv = Poll.add(pollset, session.getAPRSocket(), 
Poll.APR_POLLIN
-                                       /*| Poll.APR_POLLOUT*/);
+                       int rv;
+                       synchronized (this) {
+                               rv = Poll.add(pollset, session.getAPRSocket(), 
Poll.APR_POLLIN
+               
+                               /*| Poll.APR_POLLOUT*/);
+                       }
                        if (rv == Status.APR_SUCCESS) {
                                System.out.println("Added worker to pollset");
                                managedSessions.put(session.getAPRSocket(), 
session);
@@ -142,7 +138,7 @@
                                // FIXME: find a way to bring the real APR 
error from returned codes
                                
session.getFilterChain().fireExceptionCaught(session,
                                                new RuntimeException("APR 
Error"));
-                       }
+                       }       
                }
        }
 
@@ -155,14 +151,53 @@
                        }
 
                        // remove of the pollset
-                       Poll.remove(pollset, session.getAPRSocket());
-
+                       synchronized (this) {
+                               Poll.remove(pollset, session.getAPRSocket());   
+                       }
+                       
                        // close the socket
                        Socket.close(session.getAPRSocket());
                        clearWriteRequestQueue(session);
                        
getServiceListeners(session).fireSessionDestroyed(session);
                }
        }
+       
+    private void doFlush() {
+        if (flushingSessions.size() == 0) {
+            return;
+        }
+
+        for (;;) {
+            APRSessionImpl session = flushingSessions.poll();
+
+            if (session == null) {
+                break;
+            }
+
+            if (!session.isConnected()) {
+                clearWriteRequestQueue(session);
+                continue;
+            }
+            
+            // add the descriptor as POLLOUT
+               synchronized (this) {
+                               int rv= Poll.remove(pollset, 
session.getAPRSocket());
+                               if(rv!=Status.APR_SUCCESS) {
+                                       System.err.println("poll.remove Error : 
"+Error.strerror(rv));
+                               }
+                               rv = Poll.add(pollset, 
session.getAPRSocket(),Poll.APR_POLLIN | Poll.APR_POLLOUT);
+                               if (rv == Status.APR_SUCCESS) {
+                                       // ok
+                               } else {
+                                       System.err.println("poll.add Error : 
"+Error.strerror(rv));
+                               }
+
+               }
+               
+            
+        }
+    }
+
 
        private void read(APRSessionImpl session) {
                byte[] buf = session.getReadBuffer();
@@ -181,10 +216,8 @@
        }
 
        private void write(APRSessionImpl session) {
-               //System.err.println("Do write");
                if (session.getWriteRequestQueue().size() <= 0)
                        return;
-               System.err.println("Ok something in the queue");
                Queue<WriteRequest> writeRequestQueue = 
session.getWriteRequestQueue();
                
                for (;;) {
@@ -196,6 +229,17 @@
                        }
 
                        if (req == null) {
+                               // remove of write polling
+                               int rv= Poll.remove(pollset, 
session.getAPRSocket());
+                       if(rv!=Status.APR_SUCCESS) {
+                               System.err.println("poll.remove Error : 
"+Error.strerror(rv));
+                       }
+                       rv = Poll.add(pollset, 
session.getAPRSocket(),Poll.APR_POLLIN);
+                       if (rv == Status.APR_SUCCESS) {
+                               // ok
+                       } else {
+                               System.err.println("poll.add Error : 
"+Error.strerror(rv));
+                       }
                                break;
                        }
 
@@ -211,11 +255,25 @@
                        }
                        // be sure APR_SO_NONBLOCK was set, or it will block
                        int toWrite = buf.remaining();
-                       int writtenBytes = Socket.sendb(session.getAPRSocket(), 
buf.buf(),
-                                       0, toWrite);
+                       
+                       int writtenBytes;
+                       // APR accept ByteBuffer, only if they are Direct ones, 
due to native code
+                       if(buf.isDirect()) {
+                               writtenBytes = 
Socket.sendb(session.getAPRSocket(), buf.buf(),
+                                               0, toWrite);
+                       } else {
+                               writtenBytes = 
Socket.send(session.getAPRSocket(), buf.array(),
+                                               0, toWrite);
+                               // FIXME : kludgy ?
+                               buf.position(buf.position()+writtenBytes);
+                       }
                        if (writtenBytes > 0) {
                                // increase
+                               
                                session.increaseWrittenBytes(writtenBytes);
+                       } else {
+                               // FIXME : send the exception
+                               
System.err.println(Error.strerror(writtenBytes*-1));
                        }
 
                        // kernel buffer full for this socket, wait next polling
@@ -305,7 +363,12 @@
                                        long[] desc = new long[socketCount * 2];
 
                                        /* use 100 milliseconds poll timeout, 
TODO : parametrize for more latency/CPU usage control*/
-                                       int rv = Poll.poll(pollset, 100000, 
desc, false);
+                                       int rv;
+                                       synchronized (this) {
+                                               rv = Poll.poll(pollset, 100000, 
desc, false);
+                                       }
+                                       //System.err.println("rv poll : "+rv+" 
- "+Thread.currentThread());
+                                       
                                        if (rv > 0) {
                                                for (int n = 0; n < rv; n++) {
                                                        long clientSock = 
desc[n * 2 + 1];
@@ -325,6 +388,7 @@
                                                                write(session);
                                                }
                                        }
+                                       doFlush();
                                        notifyIdleness();
                                        doRemove();
                                } catch (Throwable t) {

Modified: 
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java?view=diff&rev=559826&r1=559825&r2=559826
==============================================================================
--- 
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java
 (original)
+++ 
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java
 Thu Jul 26 07:24:09 2007
@@ -130,4 +130,8 @@
                        APRSessionConfig {
 
        }
+       @Override
+       protected void write0(WriteRequest writeRequest) {
+               filterChain.fireFilterWrite(this, writeRequest);
+       }
 }

Modified: 
mina/sandbox/jvermillard/apr/src/test/java/org/apache/mina/transport/apr/TestCnx.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/test/java/org/apache/mina/transport/apr/TestCnx.java?view=diff&rev=559826&r1=559825&r2=559826
==============================================================================
--- 
mina/sandbox/jvermillard/apr/src/test/java/org/apache/mina/transport/apr/TestCnx.java
 (original)
+++ 
mina/sandbox/jvermillard/apr/src/test/java/org/apache/mina/transport/apr/TestCnx.java
 Thu Jul 26 07:24:09 2007
@@ -12,6 +12,7 @@
 import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoSession;
+import org.apache.mina.transport.socket.nio.SocketConnector;
 import org.apache.tomcat.jni.Library;
 
 public class TestCnx extends TestCase {
@@ -20,6 +21,7 @@
                BasicConfigurator.configure();
                Library.initialize(null);
                APRConnector cnx=new APRConnector();
+               //SocketConnector cnx=new SocketConnector(); 
                cnx.setHandler(new IoHandler(){
 
                        public void exceptionCaught(IoSession session, 
Throwable cause) throws Exception {
@@ -66,7 +68,6 @@
                Thread.sleep(1000);
                System.err.println("writing hello");
                f.getSession().write(  
ByteBuffer.wrap("HELLO\n".getBytes()).rewind() );
-               
                Thread.sleep(4000);
                System.err.println("Done");
        }


Reply via email to