Author: fhanik
Date: Thu Nov 20 08:13:02 2008
New Revision: 719264
URL: http://svn.apache.org/viewvc?rev=719264&view=rev
Log:
Fixed read/write timeouts - backport of
http://svn.apache.org/viewvc?view=rev&revision=707670
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java
URL:
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java?rev=719264&r1=719263&r2=719264&view=diff
==============================================================================
---
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java
(original)
+++
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java
Thu Nov 20 08:13:02 2008
@@ -81,6 +81,7 @@
public int write(ByteBuffer buf, NioChannel socket, long
writeTimeout,MutableInteger lastWrite) throws IOException {
SelectionKey key =
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
if ( key == null ) throw new IOException("Key no longer registered");
+ KeyReference reference = new KeyReference();
KeyAttachment att = (KeyAttachment) key.attachment();
int written = 0;
boolean timedout = false;
@@ -101,7 +102,7 @@
}
try {
if ( att.getWriteLatch()==null ||
att.getWriteLatch().getCount()==0) att.startWriteLatch(1);
- poller.add(att,SelectionKey.OP_WRITE);
+ poller.add(att,SelectionKey.OP_WRITE,reference);
att.awaitWriteLatch(writeTimeout,TimeUnit.MILLISECONDS);
}catch (InterruptedException ignore) {
Thread.interrupted();
@@ -122,9 +123,10 @@
throw new SocketTimeoutException();
} finally {
poller.remove(att,SelectionKey.OP_WRITE);
- if (timedout && key != null) {
- poller.cancelKey(socket, key);
+ if (timedout && reference.key!=null) {
+ poller.cancelKey(reference.key);
}
+ reference.key = null;
}
return written;
}
@@ -145,6 +147,7 @@
public int read(ByteBuffer buf, NioChannel socket, long readTimeout)
throws IOException {
SelectionKey key =
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
if ( key == null ) throw new IOException("Key no longer registered");
+ KeyReference reference = new KeyReference();
KeyAttachment att = (KeyAttachment) key.attachment();
int read = 0;
boolean timedout = false;
@@ -162,7 +165,7 @@
}
try {
if ( att.getReadLatch()==null ||
att.getReadLatch().getCount()==0) att.startReadLatch(1);
- poller.add(att,SelectionKey.OP_READ);
+ poller.add(att,SelectionKey.OP_READ, reference);
att.awaitReadLatch(readTimeout,TimeUnit.MILLISECONDS);
}catch (InterruptedException ignore) {
Thread.interrupted();
@@ -182,9 +185,10 @@
throw new SocketTimeoutException();
} finally {
poller.remove(att,SelectionKey.OP_READ);
- if (timedout && key != null) {
- poller.cancelKey(socket,key);
+ if (timedout && reference.key!=null) {
+ poller.cancelKey(reference.key);
}
+ reference.key = null;
}
return read;
}
@@ -193,10 +197,10 @@
protected class BlockPoller extends Thread {
protected boolean run = true;
protected Selector selector = null;
- protected ConcurrentLinkedQueue events = new ConcurrentLinkedQueue();
+ protected ConcurrentLinkedQueue<Runnable> events = new
ConcurrentLinkedQueue<Runnable>();
public void disable() { run = false; selector.wakeup();}
protected AtomicInteger wakeupCounter = new AtomicInteger(0);
- public void cancelKey(final NioChannel socket, final SelectionKey key)
{
+ public void cancelKey(final SelectionKey key) {
Runnable r = new Runnable() {
public void run() {
key.cancel();
@@ -219,7 +223,7 @@
}
}
- public void add(final KeyAttachment key, final int ops) {
+ public void add(final KeyAttachment key, final int ops, final
KeyReference ref) {
Runnable r = new Runnable() {
public void run() {
if ( key == null ) return;
@@ -231,6 +235,9 @@
try {
if (sk == null) {
sk = ch.register(selector, ops, key);
+ ref.key = sk;
+ } else if (!sk.isValid()) {
+ cancel(sk,key,ops);
} else {
sk.interestOps(sk.interestOps() | ops);
}
@@ -259,10 +266,15 @@
if
(SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE))
countDown(key.getWriteLatch());
if
(SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch());
} else {
- sk.interestOps(sk.interestOps() & (~ops));
- if
(SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE))
countDown(key.getWriteLatch());
- if
(SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch());
- if (sk.interestOps()==0) {
+ if (sk.isValid()) {
+ sk.interestOps(sk.interestOps() & (~ops));
+ if
(SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE))
countDown(key.getWriteLatch());
+ if
(SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch());
+ if (sk.interestOps()==0) {
+ sk.cancel();
+ sk.attach(null);
+ }
+ }else {
sk.cancel();
sk.attach(null);
}
@@ -284,7 +296,7 @@
boolean result = false;
Runnable r = null;
result = (events.size() > 0);
- while ( (r = (Runnable)events.poll()) != null ) {
+ while ( (r = events.poll()) != null ) {
r.run();
result = true;
}
@@ -320,12 +332,12 @@
continue;
}
- Iterator iterator = keyCount > 0 ?
selector.selectedKeys().iterator() : null;
+ Iterator<SelectionKey> iterator = keyCount > 0 ?
selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
while (run && iterator != null && iterator.hasNext()) {
- SelectionKey sk = (SelectionKey) iterator.next();
+ SelectionKey sk = iterator.next();
KeyAttachment attachment =
(KeyAttachment)sk.attachment();
try {
attachment.access();
@@ -353,15 +365,30 @@
}catch( Exception ignore ) {
if (log.isDebugEnabled())log.debug("",ignore);
}
+ try {
+ selector.close();//Close the connector
+ }catch( Exception ignore ) {
+ if (log.isDebugEnabled())log.debug("",ignore);
+ }
}
public void countDown(CountDownLatch latch) {
if ( latch == null ) return;
latch.countDown();
}
+ }
+
+ public class KeyReference {
+ SelectionKey key = null;
-
-
+ @Override
+ public void finalize() {
+ if (key!=null && key.isValid()) {
+ log.warn("Possible key leak, cancelling key in the
finalizer.");
+ try {key.cancel();}catch (Exception ignore){}
+ }
+ key = null;
+ }
}
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]