adc 2004/05/03 20:05:36
Modified: modules/network/src/java/org/apache/geronimo/network
SelectionEventListner.java SelectorManager.java
modules/network/src/java/org/apache/geronimo/network/protocol
DatagramProtocol.java ServerSocketAcceptor.java
SocketProtocol.java
Log:
Workarounds to NIO bugs and a few fixes of my own bugs.
Revision Changes Path
1.3 +2 -2
incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/SelectionEventListner.java
Index: SelectionEventListner.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/SelectionEventListner.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- SelectionEventListner.java 10 Mar 2004 09:59:12 -0000 1.2
+++ SelectionEventListner.java 4 May 2004 03:05:36 -0000 1.3
@@ -33,5 +33,5 @@
* When the SelectorKey is triggered, the service method will
* be called on the attachment.
*/
- public void selectionEvent(SelectionKey selection);
+ public void selectionEvent(SelectorManager.Event event);
}
1.8 +95 -34
incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/SelectorManager.java
Index: SelectorManager.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/SelectorManager.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- SelectorManager.java 1 May 2004 23:16:22 -0000 1.7
+++ SelectorManager.java 4 May 2004 03:05:36 -0000 1.8
@@ -25,6 +25,7 @@
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Set;
+import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -57,11 +58,6 @@
private volatile boolean running;
/**
- * The guard
- */
- private Object guard = new Object();
-
- /**
* The selector used to wait for non-blocking events.
*/
private Selector selector;
@@ -86,6 +82,11 @@
*/
private int startCounter;
+ /**
+ * A list of channels to be closed.
+ */
+ private Stack closing = new Stack();
+
public SelectorManager() throws IOException {
threadGroup = new ThreadGroup("Geronimo NIO Workers");
@@ -127,15 +128,51 @@
try {
log.debug("Selector Work thread has started.");
- log.debug("Selector Manager timeout: "+timeout);
+ log.debug("Selector Manager timeout: " + timeout);
while (running) {
try {
- synchronized (guard) { /* do nothing */
- log.trace("Waiting for selector to return.");
+ synchronized (closing) {
+ if (!closing.isEmpty()) {
+ /**
+ * Close channels that have been queued up to be
+ * closed. Closing channels in this manner
prevents
+ * NullPointExceptions.
+ *
+ *
http://developer.java.sun.com/developer/bugParade/bugs/4729342.html
+ */
+ Iterator iter = closing.iterator();
+
+ while (iter.hasNext()) {
+ SelectableChannel selectableChannel =
(SelectableChannel) iter.next();
+ selectableChannel.close();
+ }
+ closing.clear();
+ }
}
- if (selector.select(timeout) == 0) continue;
+ log.trace("Waiting for selector to return.");
+ if (selector.select(timeout) == 0) {
+ /**
+ * Clean stale connections that do not have and
data: select
+ * returns indicating that the count of active
connections with
+ * input is 0. However the list still has these
"stale"
+ * connections lingering around. We remove them
since they
+ * are prematurely triggering selection to return
w/o input.
+ *
+ *
http://nagoya.apache.org/jira/secure/ViewIssue.jspa?key=DIR-18
+ */
+ Iterator list = selector.selectedKeys().iterator();
+
+ while (list.hasNext()) {
+ SelectionKey key = (SelectionKey) list.next();
+ key.channel().close();
+ key.cancel();
+ list.remove();
+ }
+
+ continue;
+ }
// Get a java.util.Set containing the SelectionKey
objects for
// all channels that are ready for I/O.
@@ -143,73 +180,62 @@
// Use a java.util.Iterator to loop through the selected
keys
for (Iterator i = keys.iterator(); i.hasNext();) {
- final SelectionKey key = (SelectionKey) i.next();
+ SelectionKey key = (SelectionKey) i.next();
if (key.isReadable()) {
log.trace("-OP_READ " + key);
key.interestOps(key.interestOps() &
(~SelectionKey.OP_READ));
+ threadPool.getWorkManager().execute(new
Event(key, SelectionKey.OP_READ));
}
if (key.isWritable()) {
log.trace("-OP_WRITE " + key);
key.interestOps(key.interestOps() &
(~SelectionKey.OP_WRITE));
+ threadPool.getWorkManager().execute(new
Event(key, SelectionKey.OP_WRITE));
}
if (key.isAcceptable()) {
log.trace("-OP_ACCEPT " + key);
key.interestOps(key.interestOps() &
(~SelectionKey.OP_ACCEPT));
+ threadPool.getWorkManager().execute(new
Event(key, SelectionKey.OP_ACCEPT));
}
- threadPool.getWorkManager().execute(new Runnable() {
- public void run() {
- try {
- ((SelectionEventListner)
key.attachment()).selectionEvent(key);
- } catch (Throwable e) {
- log.trace("Request Failed.", e);
- }
- }
- });
-
i.remove(); // Remove the key from the set of
selected keys
}
-
+
} catch (CancelledKeyException e) {
- log.debug("Key has Been Cancelled: "+e);
+ log.debug("Key has Been Cancelled: " + e);
}
}
} catch (IOException e) {
log.warn("IOException occured.", e);
} catch (InterruptedException e) {
log.debug("Selector Work thread has been interrupted.");
- } finally {
+ } finally {
log.debug("Selector Work thread has stopped.");
}
}
public SelectionKey register(SelectableChannel selectableChannel, int
ops, SelectionEventListner listener) throws ClosedChannelException {
- synchronized (guard) {
+ synchronized (closing) {
selector.wakeup();
SelectionKey key = selectableChannel.register(selector, ops,
listener);
return key;
}
}
+
public void closeChannel(SelectableChannel selectableChannel) throws
IOException {
- synchronized (guard) {
+ synchronized (closing) {
selector.wakeup();
- selectableChannel.keyFor(selector).cancel();
- selectableChannel.close();
+ closing.push(selectableChannel);
}
}
public void addInterestOps(SelectionKey selectorKey, int addOpts) {
- synchronized (guard) {
+ synchronized (closing) {
selector.wakeup();
- selectorKey.interestOps( selectorKey.interestOps() | addOpts );
+ selectorKey.interestOps(selectorKey.interestOps() | addOpts);
}
}
- public void wakeup() {
- selector.wakeup();
- }
-
public void setGBeanContext(GBeanContext context) {
}
@@ -250,5 +276,40 @@
public static GBeanInfo getGBeanInfo() {
return GBEAN_INFO;
+ }
+
+ public class Event implements Runnable {
+
+ final int flags;
+ final SelectionKey key;
+
+ private Event(SelectionKey key, int flags) {
+ this.flags = flags;
+ this.key = key;
+ }
+
+ public SelectionKey getSelectionKey() {
+ return key;
+ }
+
+ public final boolean isReadable() {
+ return (flags & SelectionKey.OP_READ) != 0;
+ }
+
+ public final boolean isWritable() {
+ return (flags & SelectionKey.OP_WRITE) != 0;
+ }
+
+ public final boolean isAcceptable() {
+ return (flags & SelectionKey.OP_ACCEPT) != 0;
+ }
+
+ public void run() {
+ try {
+ ((SelectionEventListner)
key.attachment()).selectionEvent(this);
+ } catch (Throwable e) {
+ log.trace("Request Failed.", e);
+ }
+ }
}
}
1.6 +2 -2
incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/DatagramProtocol.java
Index: DatagramProtocol.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/DatagramProtocol.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- DatagramProtocol.java 25 Apr 2004 02:03:37 -0000 1.5
+++ DatagramProtocol.java 4 May 2004 03:05:36 -0000 1.6
@@ -207,7 +207,7 @@
ByteBuffer receiveBuffer = ByteBuffer.allocate(65336);
- public synchronized void selectionEvent(SelectionKey selection) {
+ public synchronized void selectionEvent(SelectorManager.Event event) {
boolean tracing = log.isTraceEnabled();
if (tracing) log.trace("ReadDataAction triggered.");
1.8 +4 -4
incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/ServerSocketAcceptor.java
Index: ServerSocketAcceptor.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/ServerSocketAcceptor.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- ServerSocketAcceptor.java 1 May 2004 17:23:55 -0000 1.7
+++ ServerSocketAcceptor.java 4 May 2004 03:05:36 -0000 1.8
@@ -171,10 +171,10 @@
state = STOPPED;
}
- public void selectionEvent(SelectionKey selection) {
- if (selection.isAcceptable()) {
+ public void selectionEvent(SelectorManager.Event event) {
+ if (event.isAcceptable()) {
try {
- ServerSocketChannel server = (ServerSocketChannel)
selection.channel();
+ ServerSocketChannel server = (ServerSocketChannel)
event.getSelectionKey().channel();
SocketChannel channel = server.accept();
if (channel == null) return;
1.13 +9 -8
incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/SocketProtocol.java
Index: SocketProtocol.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/SocketProtocol.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -r1.12 -r1.13
--- SocketProtocol.java 1 May 2004 23:16:37 -0000 1.12
+++ SocketProtocol.java 4 May 2004 03:05:36 -0000 1.13
@@ -185,7 +185,7 @@
if (address == null && acceptedSocketChannel == null) throw new
IllegalStateException("No address set");
- log.trace("Starting");
+ log.trace("Starting "+ this);
if (acceptedSocketChannel == null) {
try {
socketChannel = SocketChannel.open();
@@ -271,26 +271,27 @@
}
}
- public void selectionEvent(SelectionKey selection) {
+ public void selectionEvent(SelectorManager.Event event) {
try {
- if (selection.isReadable()) {
+ if (event.isReadable()) {
synchronized (serviceReadMutex) {
serviceRead();
}
}
- if (selection.isWritable()) {
+ if (event.isWritable()) {
synchronized (serviceWriteMutex) {
serviceWrite();
}
}
} catch (CancelledKeyException e) {
+ log.trace("CancelledKeyException " + e);
// who knows, by the time we get here,
// the key could have been canceled.
}
}
private void serviceWrite() {
- log.trace("serviceWrite() triggered.");
+ log.trace("serviceWrite() triggered " + selectionKey);
try {
if (sendBuffer == null) {
log.trace("Write had allready been serviced.");
@@ -328,7 +329,7 @@
public void serviceRead() {
boolean tracing = log.isTraceEnabled();
- if (tracing) log.trace("serviceRead() triggered.");
+ if (tracing) log.trace("serviceRead() triggered " + selectionKey);
lastUsed = System.currentTimeMillis();
try {
while (true) {
@@ -430,7 +431,7 @@
} catch (Throwable e) {
log.info("Closing error: ", e);
}
- log.trace("Closed");
+ log.trace("Closed "+ this);
}
state = STOPPED;
}