Author: fhanik
Date: Tue Mar 14 09:22:04 2006
New Revision: 385839
URL: http://svn.apache.org/viewcvs?rev=385839&view=rev
Log:
Implemented a blocking IO receiver, threaded
Added:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelReceiver.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ThreadPool.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelReceiver.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelReceiver.java?rev=385839&r1=385838&r2=385839&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelReceiver.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelReceiver.java
Tue Mar 14 09:22:04 2006
@@ -44,7 +44,7 @@
* set ack mode
* @param isSendAck
*/
- public void setSendAck(boolean isSendAck);
+ public void setSendAck(boolean sendack);
/**
* get the listing ip interface
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java?rev=385839&r1=385838&r2=385839&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
Tue Mar 14 09:22:04 2006
@@ -20,6 +20,7 @@
import java.nio.channels.SocketChannel;
import org.apache.catalina.tribes.ChannelMessage;
import java.io.IOException;
+import java.net.Socket;
@@ -62,6 +63,16 @@
this.buffer = new XByteBuffer(43800,true);
}
}
+ public ObjectReader(Socket socket, ListenCallback callback) {
+ try{
+ this.buffer = new XByteBuffer(socket.getReceiveBufferSize(), true);
+ }catch ( IOException x ) {
+ //unable to get buffer size
+ log.warn("Unable to retrieve the socket receiver buffer size,
setting to default 43800 bytes.");
+ this.buffer = new XByteBuffer(43800,true);
+ }
+ this.callback = callback;
+ }
/**
* get the current SimpleTcpCluster
@@ -125,6 +136,10 @@
return pkgCnt;
}
+ public int bufferSize() {
+ return buffer.getLength();
+ }
+
/**
* Returns the number of packages that the reader has read
* @return int
@@ -141,6 +156,12 @@
*/
public int write(ByteBuffer buf) throws java.io.IOException {
return getChannel().write(buf);
+ }
+
+ public void close() {
+ this.callback = null;
+ this.channel = null;
+ this.buffer = null;
}
}
Added:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java?rev=385839&view=auto
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
(added)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
Tue Mar 14 09:22:04 2006
@@ -0,0 +1,306 @@
+/*
+ * Copyright 1999,2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.catalina.tribes.tcp.bio;
+
+import java.io.IOException;
+
+import org.apache.catalina.tribes.ChannelReceiver;
+import org.apache.catalina.tribes.MessageListener;
+import java.net.InetAddress;
+import org.apache.catalina.tribes.tcp.nio.ThreadPool;
+import java.net.ServerSocket;
+import java.net.InetSocketAddress;
+import org.apache.catalina.tribes.io.ListenCallback;
+import org.apache.catalina.tribes.ChannelMessage;
+import java.net.Socket;
+import org.apache.catalina.tribes.io.ObjectReader;
+
+/**
+ * <p>Title: </p>
+ *
+ * <p>Description: </p>
+ *
+ * <p>Copyright: Copyright (c) 2005</p>
+ *
+ * <p>Company: </p>
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class BioReceiver implements Runnable, ChannelReceiver, ListenCallback {
+
+ protected static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory.getLog(BioReceiver.class);
+
+
+ protected MessageListener listener;
+ protected String host;
+ protected InetAddress bind;
+ protected int port;
+ protected boolean sendack;
+ protected boolean sync;
+ protected int rxBufSize = 43800;
+ protected int txBufSize = 25188;
+ protected int tcpThreadCount;
+ protected ServerSocket serverSocket;
+ protected boolean doRun = true;
+
+ protected ThreadPool pool;
+
+ public BioReceiver() {
+ }
+
+ /**
+ *
+ * @return The host
+ * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
+ */
+ public String getHost() {
+ return host;
+ }
+
+ /**
+ * getMessageListener
+ *
+ * @return MessageListener
+ * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
+ */
+ public MessageListener getMessageListener() {
+ return listener;
+ }
+
+ /**
+ *
+ * @return The port
+ * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
+ */
+ public int getPort() {
+ return port;
+ }
+
+ public int getRxBufSize() {
+ return rxBufSize;
+ }
+
+ public int getTxBufSize() {
+ return txBufSize;
+ }
+
+ public int getTcpThreadCount() {
+ return tcpThreadCount;
+ }
+
+ /**
+ *
+ * @return boolean
+ * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
+ */
+ public boolean getSendAck() {
+ return sendack;
+ }
+
+ /**
+ * setMessageListener
+ *
+ * @param listener MessageListener
+ * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
+ */
+ public void setMessageListener(MessageListener listener) {
+ this.listener = listener;
+ }
+
+
+ /**
+ *
+ * @param isSendAck boolean
+ * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
+ */
+ public void setSendAck(boolean sendAck) {
+ this.sendack = sendAck;
+ }
+
+ /**
+ *
+ * @throws IOException
+ * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
+ */
+ public void start() throws IOException {
+ this.doRun = true;
+ try {
+ TcpReplicationThread[] receivers = new
TcpReplicationThread[tcpThreadCount];
+ for ( int i=0; i<receivers.length; i++ ) {
+ receivers[i] = getReplicationThread();
+ }
+ pool = new ThreadPool(new Object(), receivers);
+ } catch (Exception e) {
+ log.error("ThreadPool can initilzed. Listener not started", e);
+ return;
+ }
+ try {
+ getBind();
+ bind();
+ Thread t = new Thread(this, "BioReceiver");
+ t.setDaemon(true);
+ t.start();
+ } catch (Exception x) {
+ log.fatal("Unable to start cluster receiver", x);
+ }
+ }
+
+ protected TcpReplicationThread getReplicationThread() {
+ TcpReplicationThread result = new TcpReplicationThread();
+ result.setOptions(getWorkerThreadOptions());
+ return result;
+ }
+
+ /**
+ *
+ * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
+ */
+ public void stop() {
+ this.doRun = false;
+ try {
+ this.serverSocket.close();
+ }catch ( Exception x ) {}
+ }
+
+ public void setTcpListenPort(int tcpListenPort) {
+ this.port = tcpListenPort;
+ }
+
+ public void setTcpListenAddress(String tcpListenHost) {
+ this.host = tcpListenHost;
+ }
+
+ public void setSynchronized(boolean sync) {
+ this.sync = sync;
+ }
+
+ public void setRxBufSize(int rxBufSize) {
+ this.rxBufSize = rxBufSize;
+ }
+
+ public void setTxBufSize(int txBufSize) {
+ this.txBufSize = txBufSize;
+ }
+
+ public void setTcpThreadCount(int tcpThreadCount) {
+ this.tcpThreadCount = tcpThreadCount;
+ }
+
+ public void setTcpSelectorTimeout(long timeout) {
+ //do nothing
+ }
+
+ /**
+ * @return Returns the bind.
+ */
+ public InetAddress getBind() {
+ if (bind == null) {
+ try {
+ if ("auto".equals(host)) {
+ host =
java.net.InetAddress.getLocalHost().getHostAddress();
+ }
+ if (log.isDebugEnabled())
+ log.debug("Starting replication listener on address:"+
host);
+ bind = java.net.InetAddress.getByName(host);
+ } catch (IOException ioe) {
+ log.error("Failed bind replication listener on address:"+
host, ioe);
+ }
+ }
+ return bind;
+ }
+
+ protected int bind(ServerSocket socket, int portstart, int retries) throws
IOException {
+ while ( retries > 0 ) {
+ try {
+ InetSocketAddress addr = new InetSocketAddress(getBind(),
portstart);
+ socket.bind(addr);
+ setTcpListenPort(portstart);
+ log.info("Bio Server Socket bound to:"+addr);
+ return 0;
+ }catch ( IOException x) {
+ retries--;
+ if ( retries <= 0 ) throw x;
+ portstart++;
+ retries = bind(socket,portstart,retries);
+ }
+ }
+ return retries;
+ }
+
+
+
+ protected void bind() throws IOException {
+ // allocate an unbound server socket channel
+ serverSocket = new ServerSocket();
+ // set the port the server channel will listen to
+ //serverSocket.bind(new InetSocketAddress(getBind(),
getTcpListenPort()));
+ bind(serverSocket,getPort(),10);
+ }
+
+ public void messageDataReceived(ChannelMessage data) {
+ if ( this.listener != null ) {
+ listener.messageReceived(data);
+ }
+ }
+
+ public void run() {
+ try {
+ listen();
+ } catch (Exception x) {
+ log.error("Unable to run replication listener.", x);
+ }
+
+ }
+
+ public void listen() throws Exception {
+ while ( doRun ) {
+ Socket socket = null;
+ try {
+ socket = serverSocket.accept();
+ }catch ( Exception x ) {
+ if ( doRun ) throw x;
+ }
+ if ( !doRun ) break;
+ if ( socket == null ) continue;
+ socket.setReceiveBufferSize(rxBufSize);
+ socket.setSendBufferSize(txBufSize);
+ TcpReplicationThread thread =
(TcpReplicationThread)pool.getWorker();
+ ObjectReader reader = new ObjectReader(socket,this);
+
+ if ( thread == null ) {
+ //we are out of workers, process the request on the listening
thread
+ thread = getReplicationThread();
+ thread.socket = socket;
+ thread.reader = reader;
+ thread.run();
+ } else {
+ thread.serviceSocket(socket,reader);
+ }//end if
+ }//while
+ }
+
+ public int getWorkerThreadOptions() {
+ int options = 0;
+ if ( sync ) options = options
|TcpReplicationThread.OPTION_SYNCHRONIZED;
+ if ( getSendAck() ) options = options
|TcpReplicationThread.OPTION_SEND_ACK;
+ return options;
+ }
+
+
+
+
+}
\ No newline at end of file
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java?rev=385839&r1=385838&r2=385839&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
Tue Mar 14 09:22:04 2006
@@ -1,5 +1,5 @@
/*
- * Copyright 1999,2005 The Apache Software Foundation.
+ * Copyright 1999,2006 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy
of
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java?rev=385839&r1=385838&r2=385839&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java
Tue Mar 14 09:22:04 2006
@@ -33,7 +33,7 @@
protected long selectTimeout = 1000;
protected boolean waitForAck = false;
protected int retryAttempts=0;
- protected int keepAliveCount = Integer.MAX_VALUE;
+ protected int keepAliveCount = -1;
protected HashMap bioSenders = new HashMap();
protected boolean directBuf = false;
protected int rxBufSize = 43800;
Added:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java?rev=385839&view=auto
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java
(added)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java
Tue Mar 14 09:22:04 2006
@@ -0,0 +1,173 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.tcp.bio;
+import java.io.IOException;
+
+import org.apache.catalina.tribes.io.ObjectReader;
+import org.apache.catalina.tribes.tcp.Constants;
+import org.apache.catalina.tribes.tcp.nio.WorkerThread;
+import java.net.Socket;
+import java.io.InputStream;
+
+/**
+ * A worker thread class which can drain channels and echo-back the input. Each
+ * instance is constructed with a reference to the owning thread pool object.
+ * When started, the thread loops forever waiting to be awakened to service the
+ * channel associated with a SelectionKey object. The worker is tasked by
+ * calling its serviceChannel() method with a SelectionKey object. The
+ * serviceChannel() method stores the key reference in the thread object then
+ * calls notify() to wake it up. When the channel has been drained, the worker
+ * thread returns itself to its parent pool.
+ *
+ * @author Filip Hanik
+ *
+ * @version $Revision: 378050 $, $Date: 2006-02-15 12:30:02 -0600 (Wed, 15 Feb
2006) $
+ */
+public class TcpReplicationThread extends WorkerThread {
+ public static final int OPTION_SEND_ACK = 0x0001;
+ public static final int OPTION_SYNCHRONIZED = 0x0002;
+ public static final int OPTION_DIRECT_BUFFER = 0x0004;
+
+
+ protected static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory.getLog( TcpReplicationThread.class );
+
+ protected Socket socket;
+ protected ObjectReader reader;
+
+ public TcpReplicationThread ()
+ {
+ }
+
+ // loop forever waiting for work to do
+ public synchronized void run()
+ {
+ while (doRun) {
+ try {
+ // sleep and release object lock
+ this.wait();
+ } catch (InterruptedException e) {
+ if(log.isInfoEnabled())
+ log.info("TCP worker thread interrupted in cluster",e);
+ // clear interrupt status
+ Thread.interrupted();
+ }
+ if ( this.socket != null ) {
+ try {
+ drainSocket();
+ } catch ( Exception x ) {
+ log.error("Unable to service bio socket");
+ }finally {
+ try {reader.close();}catch ( Exception x){}
+ try {socket.close();}catch ( Exception x){}
+ reader = null;
+ socket = null;
+ }
+ }
+ // done, ready for more, return to pool
+ if ( this.pool != null ) this.pool.returnWorker (this);
+ else doRun = false;
+ }
+ }
+
+
+ public synchronized void serviceSocket(Socket socket, ObjectReader reader)
{
+ this.socket = socket;
+ this.reader = reader;
+ this.notify(); // awaken the thread
+ }
+
+ protected void execute(ObjectReader reader) throws Exception{
+ int pkgcnt = reader.count();
+ /**
+ * Use send ack here if you want to ack the request to the remote
+ * server before completing the request
+ * This is considered an asynchronized request
+ */
+ if (sendAckAsync()) {
+ while ( pkgcnt > 0 ) {
+ sendAck();
+ pkgcnt--;
+ }
+ }
+ //check to see if any data is available
+ pkgcnt = reader.execute();
+
+ /**
+ * Use send ack here if you want the request to complete on this
+ * server before sending the ack to the remote server
+ * This is considered a synchronized request
+ */
+ if (sendAckSync()) {
+ while ( pkgcnt > 0 ) {
+ sendAck();
+ pkgcnt--;
+ }
+ }
+
+ }
+
+ /**
+ * The actual code which drains the channel associated with
+ * the given key. This method assumes the key has been
+ * modified prior to invocation to turn off selection
+ * interest in OP_READ. When this method completes it
+ * re-enables OP_READ and calls wakeup() on the selector
+ * so the selector will resume watching this channel.
+ */
+ protected void drainSocket () throws Exception {
+ InputStream in = socket.getInputStream();
+ // loop while data available, channel is non-blocking
+ byte[] buf = new byte[1024];
+ int length = in.read(buf);
+ while ( length >= 0 ) {
+ int count = reader.append(buf,0,length,true);
+ if ( count > 0 ) execute(reader);
+ if ( in.available() == 0 && reader.bufferSize() == 0 ) length = -1;
+ else length = in.read(buf);
+ }
+ }
+
+
+ public boolean sendAckSync() {
+ int options = getOptions();
+ return ((OPTION_SEND_ACK & options) == OPTION_SEND_ACK) &&
+ ((OPTION_SYNCHRONIZED & options) == OPTION_SYNCHRONIZED);
+ }
+
+ public boolean sendAckAsync() {
+ int options = getOptions();
+ return ((OPTION_SEND_ACK & options) == OPTION_SEND_ACK) &&
+ ((OPTION_SYNCHRONIZED & options) != OPTION_SYNCHRONIZED);
+ }
+
+
+ /**
+ * send a reply-acknowledgement (6,2,3)
+ * @param key
+ * @param channel
+ */
+ protected void sendAck() {
+ try {
+ this.socket.getOutputStream().write(Constants.ACK_COMMAND);
+ if (log.isTraceEnabled()) {
+ log.trace("ACK sent to " + socket.getPort());
+ }
+ } catch ( java.io.IOException x ) {
+ log.warn("Unable to send ACK back through channel, channel
disconnected?: "+x.getMessage());
+ }
+ }
+}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java?rev=385839&r1=385838&r2=385839&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
Tue Mar 14 09:22:04 2006
@@ -136,7 +136,7 @@
}
try {
getBind();
- this.bind();
+ bind();
Thread t = new Thread(this, "NioReceiver");
t.setDaemon(true);
t.start();
@@ -310,7 +310,7 @@
try {
listen();
} catch (Exception x) {
- log.error("Unable to start replication listener.", x);
+ log.error("Unable to run replication listener.", x);
}
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ThreadPool.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ThreadPool.java?rev=385839&r1=385838&r2=385839&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ThreadPool.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ThreadPool.java
Tue Mar 14 09:22:04 2006
@@ -17,6 +17,7 @@
package org.apache.catalina.tribes.tcp.nio;
import java.util.LinkedList;
import java.util.List;
+import java.util.Iterator;
/**
* @author not attributable
@@ -34,6 +35,7 @@
List idle = new LinkedList();
Object mutex = new Object();
Object interestOpsMutex = null;
+ boolean running = true;
public ThreadPool (Object interestOpsMutex, WorkerThread[] threads) throws
Exception {
// fill up the pool with worker threads
@@ -98,12 +100,29 @@
* idle pool.
*/
public void returnWorker (WorkerThread worker) {
- synchronized (mutex) {
- idle.add (worker);
- mutex.notify();
+ if ( running ) {
+ synchronized (mutex) {
+ idle.add(worker);
+ mutex.notify();
+ }
+ }else {
+ worker.doRun = false;
+ synchronized (worker){worker.notify();}
}
}
public Object getInterestOpsMutex() {
return interestOpsMutex;
+ }
+
+ public void stop() {
+ running = false;
+ synchronized (mutex) {
+ Iterator i = idle.iterator();
+ while ( i.hasNext() ) {
+ WorkerThread worker = (WorkerThread)i.next();
+ returnWorker(worker);
+ i.remove();
+ }
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]