Author: fhanik
Date: Wed May  9 09:46:06 2007
New Revision: 536580

URL: http://svn.apache.org/viewvc?view=rev&rev=536580
Log:
Separate out read vs write latches, simplify implementation, avoid concurrency 
issues and prepare for new comet strategies


Modified:
    
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java
    tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java

Modified: 
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java?view=diff&rev=536580&r1=536579&r2=536580
==============================================================================
--- 
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java 
(original)
+++ 
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java 
Wed May  9 09:46:06 2007
@@ -66,20 +66,20 @@
                 if ( key == null ) throw new IOException("Key no longer 
registered");
                 KeyAttachment att = (KeyAttachment) key.attachment();
                 try {
-                    if ( att.getLatch()==null || att.getLatch().getCount()==0) 
att.startLatch(1,SelectionKey.OP_WRITE);
+                    if ( att.getWriteLatch()==null || 
att.getWriteLatch().getCount()==0) att.startWriteLatch(1);
                     //only register for write if a write has not yet been 
issued
                     if ( (att.interestOps() & SelectionKey.OP_WRITE) == 0) 
socket.getPoller().add(socket,SelectionKey.OP_WRITE);
-                    
att.awaitLatch(writeTimeout,TimeUnit.MILLISECONDS,SelectionKey.OP_WRITE);
+                    att.awaitWriteLatch(writeTimeout,TimeUnit.MILLISECONDS);
                 }catch (InterruptedException ignore) {
                     Thread.interrupted();
                 }
-                if ( att.getLatch()!=null && att.getLatch().getCount()> 0) {
+                if ( att.getWriteLatch()!=null && 
att.getWriteLatch().getCount()> 0) {
                     //we got interrupted, but we haven't received notification 
from the poller.
                     keycount = 0;
                 }else {
                     //latch countdown has happened
                     keycount = 1;
-                    att.resetLatch();
+                    att.resetWriteLatch();
                 }
 
                 if (writeTimeout > 0 && (keycount == 0))
@@ -135,19 +135,19 @@
                 }
                 KeyAttachment att = (KeyAttachment) key.attachment();
                 try {
-                    if ( att.getLatch()==null || att.getLatch().getCount()==0) 
att.startLatch(1,SelectionKey.OP_READ);
+                    if ( att.getReadLatch()==null || 
att.getReadLatch().getCount()==0) att.startReadLatch(1);
                     if ( att.interestOps() == 0) 
socket.getPoller().add(socket,SelectionKey.OP_READ);
-                    att.awaitLatch(readTimeout,TimeUnit.MILLISECONDS, 
SelectionKey.OP_READ);
+                    att.awaitReadLatch(readTimeout,TimeUnit.MILLISECONDS);
                 }catch (InterruptedException ignore) {
                     Thread.interrupted();
                 }
-                if ( att.getLatch()!=null && att.getLatch().getCount()> 0) {
+                if ( att.getReadLatch()!=null && 
att.getReadLatch().getCount()> 0) {
                     //we got interrupted, but we haven't received notification 
from the poller.
                     keycount = 0;
                 }else {
                     //latch countdown has happened
                     keycount = 1;
-                    att.resetLatch();
+                    att.resetReadLatch();
                 }
                 if (readTimeout > 0 && (keycount == 0))
                     timedout = (System.currentTimeMillis() - time) >= 
readTimeout;

Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?view=diff&rev=536580&r1=536579&r2=536580
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java 
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Wed 
May  9 09:46:06 2007
@@ -1495,9 +1495,12 @@
                     sk.attach(attachment);//cant remember why this is here
                     NioChannel channel = attachment.getChannel();
                     if (sk.isReadable() || sk.isWritable() ) {
-                        if ( attachment.getLatch() != null ) {
-                            unreg(sk, attachment,attachment.getLatchOps());
-                            attachment.getLatch().countDown();
+                        if ( sk.isReadable() && attachment.getReadLatch() != 
null ) {
+                            unreg(sk, attachment,SelectionKey.OP_READ);
+                            attachment.getReadLatch().countDown();
+                        } else if ( sk.isWritable() && 
attachment.getWriteLatch() != null ) {
+                            unreg(sk, attachment,SelectionKey.OP_WRITE);
+                            attachment.getWriteLatch().countDown();
                         } else if ( attachment.getSendfileData() != null ) {
                             processSendfile(sk,attachment,true);
                         } else if ( attachment.getComet() ) {
@@ -1650,9 +1653,10 @@
             fairness = 0;
             lastRegistered = 0;
             sendfileData = null;
-            if ( latch!=null ) try {latch.countDown();}catch (Exception 
ignore){}
-            latch = null;
-            latchOps = 0;
+            if ( readLatch!=null ) try {for (int i=0; 
i<(int)readLatch.getCount();i++) readLatch.countDown();}catch (Exception 
ignore){}
+            readLatch = null;
+            if ( writeLatch!=null ) try {for (int i=0; 
i<(int)writeLatch.getCount();i++) writeLatch.countDown();}catch (Exception 
ignore){}
+            writeLatch = null;
         }
         
         public void reset() {
@@ -1679,25 +1683,32 @@
         protected int interestOps = 0;
         public int interestOps() { return interestOps;}
         public int interestOps(int ops) { this.interestOps  = ops; return ops; 
}
-        public CountDownLatch getLatch() { return latch; }
-        public void resetLatch() { 
-            if ( latch.getCount() == 0 ) latch = null; 
+        public CountDownLatch getReadLatch() { return readLatch; }
+        public CountDownLatch getWriteLatch() { return writeLatch; }
+        protected CountDownLatch resetLatch(CountDownLatch latch) {
+            if ( latch.getCount() == 0 ) return null;
             else throw new IllegalStateException("Latch must be at count 0");
-            latchOps = 0;
         }
-        public void startLatch(int cnt, int latchOps) { 
+        public void resetReadLatch() { readLatch = resetLatch(readLatch); }
+        public void resetWriteLatch() { writeLatch = resetLatch(writeLatch); }
+        
+        protected CountDownLatch startLatch(CountDownLatch latch, int cnt) { 
             if ( latch == null || latch.getCount() == 0 ) {
-                this.latch = new CountDownLatch(cnt);
-                this.latchOps = latchOps;
+                return new CountDownLatch(cnt);
             }
             else throw new IllegalStateException("Latch must be at count 0 or 
null.");
         }
-        public void awaitLatch(long timeout, TimeUnit unit, int latchOps) 
throws InterruptedException {
+        public void startReadLatch(int cnt) { readLatch = 
startLatch(readLatch,cnt);}
+        public void startWriteLatch(int cnt) { writeLatch = 
startLatch(writeLatch,cnt);}
+        
+        
+        protected void awaitLatch(CountDownLatch latch, long timeout, TimeUnit 
unit) throws InterruptedException {
             if ( latch == null ) throw new IllegalStateException("Latch cannot 
be null");
-            this.latchOps = this.latchOps | latchOps;
             latch.await(timeout,unit);
         }
-        public int getLatchOps() { return latchOps;}
+        public void awaitReadLatch(long timeout, TimeUnit unit) throws 
InterruptedException { awaitLatch(readLatch,timeout,unit);}
+        public void awaitWriteLatch(long timeout, TimeUnit unit) throws 
InterruptedException { awaitLatch(writeLatch,timeout,unit);}
+        
         public int getFairness() { return fairness; }
         public void setFairness(int f) { fairness = f;}
         public void incFairness() { fairness++; }
@@ -1714,50 +1725,12 @@
         protected long timeout = -1;
         protected boolean error = false;
         protected NioChannel channel = null;
-        protected CountDownLatch latch = null;
-        protected int latchOps = 0;
+        protected CountDownLatch readLatch = null;
+        protected CountDownLatch writeLatch = null;
         protected int fairness = 0;
         protected long lastRegistered = 0;
         protected SendfileData sendfileData = null;
     }
-// ----------------------------------------------------- Key Fairness 
Comparator
-    public static class KeyFairnessComparator implements 
Comparator<SelectionKey> {
-        public int compare(SelectionKey ska1, SelectionKey ska2) {
-            KeyAttachment ka1 = (KeyAttachment)ska1.attachment();
-            KeyAttachment ka2 = (KeyAttachment)ska2.attachment();
-            if ( ka1 == null && ka2 == null ) return 0;
-            if ( ka1 == null ) return 1; //invalid keys go last
-            if ( ka2 == null ) return -1; //invalid keys go last
-            long lr1 = ka1.getLastRegistered();
-            long lr2 = ka2.getLastRegistered();
-            int f1 = ka1.getFairness();
-            int f2 = ka2.getFairness();
-            CountDownLatch lat1 = ka1.getLatch();
-            CountDownLatch lat2 = ka2.getLatch();
-            if ( lat1 != null && lat2 != null ) {
-                return 0;
-            } else if ( lat1 != null && lat2 == null ) {
-                //latches have highest priority 
-                return -1;
-            } else if ( lat1 == null && lat2 != null ) {
-                return 1;
-            } else if ( f1 == f2 ) {
-                if ( lr1 == lr2 ) return 0;
-                //earlier objects have priorioty
-                else return lr1<lr2?-1:1;                
-            } else {
-                //higher fairness means earlier in the queue
-                //as fairness count means how many times the poller has 
skipped 
-                //this socket, and the socket has been ready, there just 
hasn't 
-                //been any worker thread available to handle it
-                return ka1.getFairness()>ka2.getFairness()?-1:1;
-            }
-        }
-    }
-
-
-
-
     // ----------------------------------------------------- Worker Inner Class
 
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to