Author: fhanik
Date: Tue Mar 14 09:22:04 2006
New Revision: 385839

URL: http://svn.apache.org/viewcvs?rev=385839&view=rev
Log:
Implemented a blocking IO receiver, threaded

Added:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java
Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelReceiver.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ThreadPool.java

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelReceiver.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelReceiver.java?rev=385839&r1=385838&r2=385839&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelReceiver.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelReceiver.java
 Tue Mar 14 09:22:04 2006
@@ -44,7 +44,7 @@
      * set ack mode
      * @param isSendAck
      */
-    public void setSendAck(boolean isSendAck);
+    public void setSendAck(boolean sendack);
     
     /**
      * get the listing ip interface

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java?rev=385839&r1=385838&r2=385839&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
 Tue Mar 14 09:22:04 2006
@@ -20,6 +20,7 @@
 import java.nio.channels.SocketChannel;
 import org.apache.catalina.tribes.ChannelMessage;
 import java.io.IOException;
+import java.net.Socket;
 
 
 
@@ -62,6 +63,16 @@
             this.buffer = new XByteBuffer(43800,true);
         }
     }
+    public ObjectReader(Socket socket, ListenCallback callback) {
+        try{
+            this.buffer = new XByteBuffer(socket.getReceiveBufferSize(), true);
+        }catch ( IOException x ) {
+            //unable to get buffer size
+            log.warn("Unable to retrieve the socket receiver buffer size, 
setting to default 43800 bytes.");
+            this.buffer = new XByteBuffer(43800,true);
+        }
+        this.callback = callback;
+    }
 
     /**
      * get the current SimpleTcpCluster
@@ -125,6 +136,10 @@
         return pkgCnt;
     }
     
+    public int bufferSize() {
+        return buffer.getLength();
+    }
+    
     /**
      * Returns the number of packages that the reader has read
      * @return int
@@ -141,6 +156,12 @@
      */
     public int write(ByteBuffer buf) throws java.io.IOException {
         return getChannel().write(buf);
+    }
+    
+    public void close() {
+        this.callback = null;
+        this.channel = null;
+        this.buffer = null;
     }
 
 }

Added: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java?rev=385839&view=auto
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
 (added)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
 Tue Mar 14 09:22:04 2006
@@ -0,0 +1,306 @@
+/*
+ * Copyright 1999,2006 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.catalina.tribes.tcp.bio;
+
+import java.io.IOException;
+
+import org.apache.catalina.tribes.ChannelReceiver;
+import org.apache.catalina.tribes.MessageListener;
+import java.net.InetAddress;
+import org.apache.catalina.tribes.tcp.nio.ThreadPool;
+import java.net.ServerSocket;
+import java.net.InetSocketAddress;
+import org.apache.catalina.tribes.io.ListenCallback;
+import org.apache.catalina.tribes.ChannelMessage;
+import java.net.Socket;
+import org.apache.catalina.tribes.io.ObjectReader;
+
+/**
+ * <p>Title: </p>
+ *
+ * <p>Description: </p>
+ *
+ * <p>Copyright: Copyright (c) 2005</p>
+ *
+ * <p>Company: </p>
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class BioReceiver implements Runnable, ChannelReceiver, ListenCallback {
+
+    protected static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory.getLog(BioReceiver.class);
+
+
+    protected MessageListener listener;
+    protected String host;
+    protected InetAddress bind;
+    protected int port;
+    protected boolean sendack;
+    protected boolean sync;
+    protected int rxBufSize = 43800;
+    protected int txBufSize = 25188;    
+    protected int tcpThreadCount;
+    protected ServerSocket serverSocket;
+    protected boolean doRun = true;
+    
+    protected ThreadPool pool;
+
+    public BioReceiver() {
+    }
+
+    /**
+     *
+     * @return The host
+     * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
+     */
+    public String getHost() {
+        return host;
+    }
+
+    /**
+     * getMessageListener
+     *
+     * @return MessageListener
+     * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
+     */
+    public MessageListener getMessageListener() {
+        return listener;
+    }
+
+    /**
+     *
+     * @return The port
+     * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
+     */
+    public int getPort() {
+        return port;
+    }
+
+    public int getRxBufSize() {
+        return rxBufSize;
+    }
+
+    public int getTxBufSize() {
+        return txBufSize;
+    }
+
+    public int getTcpThreadCount() {
+        return tcpThreadCount;
+    }
+
+    /**
+     *
+     * @return boolean
+     * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
+     */
+    public boolean getSendAck() {
+        return sendack;
+    }
+
+    /**
+     * setMessageListener
+     *
+     * @param listener MessageListener
+     * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
+     */
+    public void setMessageListener(MessageListener listener) {
+        this.listener = listener;
+    }
+    
+
+    /**
+     *
+     * @param isSendAck boolean
+     * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
+     */
+    public void setSendAck(boolean sendAck) {
+        this.sendack = sendAck;
+    }
+
+    /**
+     *
+     * @throws IOException
+     * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
+     */
+    public void start() throws IOException {
+        this.doRun = true;
+        try {
+            TcpReplicationThread[] receivers = new 
TcpReplicationThread[tcpThreadCount];
+            for ( int i=0; i<receivers.length; i++ ) {
+                receivers[i] = getReplicationThread();
+            }
+            pool = new ThreadPool(new Object(), receivers);
+        } catch (Exception e) {
+            log.error("ThreadPool can initilzed. Listener not started", e);
+            return;
+        }
+        try {
+            getBind();
+            bind();
+            Thread t = new Thread(this, "BioReceiver");
+            t.setDaemon(true);
+            t.start();
+        } catch (Exception x) {
+            log.fatal("Unable to start cluster receiver", x);
+        }
+    }
+    
+    protected TcpReplicationThread getReplicationThread() {
+        TcpReplicationThread result = new TcpReplicationThread();
+        result.setOptions(getWorkerThreadOptions());
+        return result;
+    }
+
+    /**
+     *
+     * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
+     */
+    public void stop() {
+        this.doRun = false;
+        try {
+            this.serverSocket.close();
+        }catch ( Exception x ) {}
+    }
+
+    public void setTcpListenPort(int tcpListenPort) {
+        this.port = tcpListenPort;
+    }
+
+    public void setTcpListenAddress(String tcpListenHost) {
+        this.host = tcpListenHost;
+    }
+
+    public void setSynchronized(boolean sync) {
+        this.sync = sync;
+    }
+
+    public void setRxBufSize(int rxBufSize) {
+        this.rxBufSize = rxBufSize;
+    }
+
+    public void setTxBufSize(int txBufSize) {
+        this.txBufSize = txBufSize;
+    }
+
+    public void setTcpThreadCount(int tcpThreadCount) {
+        this.tcpThreadCount = tcpThreadCount;
+    }
+    
+    public void setTcpSelectorTimeout(long timeout) {
+        //do nothing
+    }
+    
+    /**
+     * @return Returns the bind.
+     */
+    public InetAddress getBind() {
+        if (bind == null) {
+            try {
+                if ("auto".equals(host)) {
+                    host = 
java.net.InetAddress.getLocalHost().getHostAddress();
+                }
+                if (log.isDebugEnabled())
+                    log.debug("Starting replication listener on address:"+ 
host);
+                bind = java.net.InetAddress.getByName(host);
+            } catch (IOException ioe) {
+                log.error("Failed bind replication listener on address:"+ 
host, ioe);
+            }
+        }
+        return bind;
+    }
+    
+    protected int bind(ServerSocket socket, int portstart, int retries) throws 
IOException {
+        while ( retries > 0 ) {
+            try {
+                InetSocketAddress addr = new InetSocketAddress(getBind(), 
portstart);
+                socket.bind(addr);
+                setTcpListenPort(portstart);
+                log.info("Bio Server Socket bound to:"+addr);
+                return 0;
+            }catch ( IOException x) {
+                retries--;
+                if ( retries <= 0 ) throw x;
+                portstart++;
+                retries = bind(socket,portstart,retries);
+            }
+        }
+        return retries;
+    }
+
+
+    
+    protected void bind() throws IOException {
+        // allocate an unbound server socket channel
+        serverSocket = new ServerSocket();
+        // set the port the server channel will listen to
+        //serverSocket.bind(new InetSocketAddress(getBind(), 
getTcpListenPort()));
+        bind(serverSocket,getPort(),10);
+    }
+    
+    public void messageDataReceived(ChannelMessage data) {
+        if ( this.listener != null ) {
+            listener.messageReceived(data);
+        }
+    }
+    
+    public void run() {
+        try {
+            listen();
+        } catch (Exception x) {
+            log.error("Unable to run replication listener.", x);
+        }
+
+    }
+    
+    public void listen() throws Exception {
+        while ( doRun ) {
+            Socket socket = null;
+            try {
+                socket = serverSocket.accept();
+            }catch ( Exception x ) {
+                if ( doRun ) throw x;
+            }
+            if ( !doRun ) break;
+            if ( socket == null ) continue;
+            socket.setReceiveBufferSize(rxBufSize);
+            socket.setSendBufferSize(txBufSize);
+            TcpReplicationThread thread = 
(TcpReplicationThread)pool.getWorker();
+            ObjectReader reader = new ObjectReader(socket,this);
+
+            if ( thread == null ) {
+                //we are out of workers, process the request on the listening 
thread
+                thread = getReplicationThread();
+                thread.socket = socket;
+                thread.reader = reader;
+                thread.run();
+            } else { 
+                thread.serviceSocket(socket,reader);
+            }//end if
+        }//while
+    }
+    
+    public int getWorkerThreadOptions() {
+        int options = 0;
+        if ( sync ) options = options 
|TcpReplicationThread.OPTION_SYNCHRONIZED;
+        if ( getSendAck() ) options = options 
|TcpReplicationThread.OPTION_SEND_ACK;
+        return options;
+    }
+
+
+
+
+}
\ No newline at end of file

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java?rev=385839&r1=385838&r2=385839&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
 Tue Mar 14 09:22:04 2006
@@ -1,5 +1,5 @@
 /*
- * Copyright 1999,2005 The Apache Software Foundation.
+ * Copyright 1999,2006 The Apache Software Foundation.
  * 
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
  * use this file except in compliance with the License. You may obtain a copy 
of

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java?rev=385839&r1=385838&r2=385839&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java
 Tue Mar 14 09:22:04 2006
@@ -33,7 +33,7 @@
     protected long selectTimeout = 1000; 
     protected boolean waitForAck = false;
     protected int retryAttempts=0;
-    protected int keepAliveCount = Integer.MAX_VALUE;
+    protected int keepAliveCount = -1;
     protected HashMap bioSenders = new HashMap();
     protected boolean directBuf = false;
     protected int rxBufSize = 43800;

Added: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java?rev=385839&view=auto
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java
 (added)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java
 Tue Mar 14 09:22:04 2006
@@ -0,0 +1,173 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.tcp.bio;
+import java.io.IOException;
+
+import org.apache.catalina.tribes.io.ObjectReader;
+import org.apache.catalina.tribes.tcp.Constants;
+import org.apache.catalina.tribes.tcp.nio.WorkerThread;
+import java.net.Socket;
+import java.io.InputStream;
+
+/**
+ * A worker thread class which can drain channels and echo-back the input. Each
+ * instance is constructed with a reference to the owning thread pool object.
+ * When started, the thread loops forever waiting to be awakened to service the
+ * channel associated with a SelectionKey object. The worker is tasked by
+ * calling its serviceChannel() method with a SelectionKey object. The
+ * serviceChannel() method stores the key reference in the thread object then
+ * calls notify() to wake it up. When the channel has been drained, the worker
+ * thread returns itself to its parent pool.
+ * 
+ * @author Filip Hanik
+ * 
+ * @version $Revision: 378050 $, $Date: 2006-02-15 12:30:02 -0600 (Wed, 15 Feb 
2006) $
+ */
+public class TcpReplicationThread extends WorkerThread {
+    public static final int OPTION_SEND_ACK = 0x0001;
+    public static final int OPTION_SYNCHRONIZED = 0x0002;
+    public static final int OPTION_DIRECT_BUFFER = 0x0004;
+
+
+    protected static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory.getLog( TcpReplicationThread.class );
+    
+    protected Socket socket;
+    protected ObjectReader reader;
+    
+    public TcpReplicationThread ()
+    {
+    }
+
+    // loop forever waiting for work to do
+    public synchronized void run()
+    {
+        while (doRun) {
+            try {
+                // sleep and release object lock
+                this.wait();
+            } catch (InterruptedException e) {
+                if(log.isInfoEnabled())
+                    log.info("TCP worker thread interrupted in cluster",e);
+                // clear interrupt status
+                Thread.interrupted();
+            }
+            if ( this.socket != null ) {
+                try {
+                    drainSocket();
+                } catch ( Exception x ) {
+                    log.error("Unable to service bio socket");
+                }finally {
+                    try {reader.close();}catch ( Exception x){}
+                    try {socket.close();}catch ( Exception x){}
+                    reader = null;
+                    socket = null;
+                }
+            }
+            // done, ready for more, return to pool
+            if ( this.pool != null ) this.pool.returnWorker (this);
+            else doRun = false;
+        }
+    }
+
+    
+    public synchronized void serviceSocket(Socket socket, ObjectReader reader) 
{
+        this.socket = socket;
+        this.reader = reader;
+        this.notify();         // awaken the thread
+    }
+    
+    protected void execute(ObjectReader reader) throws Exception{
+        int pkgcnt = reader.count();
+        /**
+         * Use send ack here if you want to ack the request to the remote 
+         * server before completing the request
+         * This is considered an asynchronized request
+         */
+        if (sendAckAsync()) {
+            while ( pkgcnt > 0 ) {
+                sendAck();
+                pkgcnt--;
+            }
+        }
+        //check to see if any data is available
+        pkgcnt = reader.execute();
+
+        /**
+         * Use send ack here if you want the request to complete on this 
+         * server before sending the ack to the remote server
+         * This is considered a synchronized request
+         */
+        if (sendAckSync()) {
+            while ( pkgcnt > 0 ) {
+                sendAck();
+                pkgcnt--;
+            }
+        }        
+       
+    }
+
+    /**
+     * The actual code which drains the channel associated with
+     * the given key.  This method assumes the key has been
+     * modified prior to invocation to turn off selection
+     * interest in OP_READ.  When this method completes it
+     * re-enables OP_READ and calls wakeup() on the selector
+     * so the selector will resume watching this channel.
+     */
+    protected void drainSocket () throws Exception {
+        InputStream in = socket.getInputStream();
+        // loop while data available, channel is non-blocking
+        byte[] buf = new byte[1024];
+        int length = in.read(buf);
+        while ( length >= 0 ) {
+            int count = reader.append(buf,0,length,true);
+            if ( count > 0 ) execute(reader);
+            if ( in.available() == 0 && reader.bufferSize() == 0 ) length = -1;
+            else length = in.read(buf);
+        }
+    }
+
+
+    public boolean sendAckSync() {
+        int options = getOptions();
+        return ((OPTION_SEND_ACK & options) == OPTION_SEND_ACK) &&
+               ((OPTION_SYNCHRONIZED & options) == OPTION_SYNCHRONIZED);
+    }
+
+    public boolean sendAckAsync() {
+        int options = getOptions();
+        return ((OPTION_SEND_ACK & options) == OPTION_SEND_ACK) &&
+               ((OPTION_SYNCHRONIZED & options) != OPTION_SYNCHRONIZED);
+    }
+
+
+    /**
+     * send a reply-acknowledgement (6,2,3)
+     * @param key
+     * @param channel
+     */
+    protected void sendAck() {
+        try {
+            this.socket.getOutputStream().write(Constants.ACK_COMMAND);
+            if (log.isTraceEnabled()) {
+                log.trace("ACK sent to " + socket.getPort());
+            }
+        } catch ( java.io.IOException x ) {
+            log.warn("Unable to send ACK back through channel, channel 
disconnected?: "+x.getMessage());
+        }
+    }
+}

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java?rev=385839&r1=385838&r2=385839&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
 Tue Mar 14 09:22:04 2006
@@ -136,7 +136,7 @@
         }
         try {
             getBind();
-            this.bind();
+            bind();
             Thread t = new Thread(this, "NioReceiver");
             t.setDaemon(true);
             t.start();
@@ -310,7 +310,7 @@
         try {
             listen();
         } catch (Exception x) {
-            log.error("Unable to start replication listener.", x);
+            log.error("Unable to run replication listener.", x);
         }
     }
 

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ThreadPool.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ThreadPool.java?rev=385839&r1=385838&r2=385839&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ThreadPool.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ThreadPool.java
 Tue Mar 14 09:22:04 2006
@@ -17,6 +17,7 @@
 package org.apache.catalina.tribes.tcp.nio;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Iterator;
 
 /**
  * @author not attributable
@@ -34,6 +35,7 @@
     List idle = new LinkedList();
     Object mutex = new Object();
     Object interestOpsMutex = null;
+    boolean running = true;
 
     public ThreadPool (Object interestOpsMutex, WorkerThread[] threads) throws 
Exception {
         // fill up the pool with worker threads
@@ -98,12 +100,29 @@
      * idle pool.
      */
     public void returnWorker (WorkerThread worker) {
-        synchronized (mutex) {
-            idle.add (worker);
-            mutex.notify();
+        if ( running ) {
+            synchronized (mutex) {
+                idle.add(worker);
+                mutex.notify();
+            }
+        }else {
+            worker.doRun = false;
+            synchronized (worker){worker.notify();}
         }
     }
     public Object getInterestOpsMutex() {
         return interestOpsMutex;
+    }
+    
+    public void stop() {
+        running = false;
+        synchronized (mutex) {
+            Iterator i = idle.iterator();
+            while ( i.hasNext() ) {
+                WorkerThread worker = (WorkerThread)i.next();
+                returnWorker(worker);
+                i.remove();
+            }
+        }
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to