Author: fhanik Date: Tue Mar 14 09:22:04 2006 New Revision: 385839 URL: http://svn.apache.org/viewcvs?rev=385839&view=rev Log: Implemented a blocking IO receiver, threaded
Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelReceiver.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ThreadPool.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelReceiver.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelReceiver.java?rev=385839&r1=385838&r2=385839&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelReceiver.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelReceiver.java Tue Mar 14 09:22:04 2006 @@ -44,7 +44,7 @@ * set ack mode * @param isSendAck */ - public void setSendAck(boolean isSendAck); + public void setSendAck(boolean sendack); /** * get the listing ip interface Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java?rev=385839&r1=385838&r2=385839&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java Tue Mar 14 09:22:04 2006 @@ -20,6 +20,7 @@ import java.nio.channels.SocketChannel; import org.apache.catalina.tribes.ChannelMessage; import java.io.IOException; +import java.net.Socket; @@ -62,6 +63,16 @@ this.buffer = new XByteBuffer(43800,true); } } + public ObjectReader(Socket socket, ListenCallback callback) { + try{ + this.buffer = new XByteBuffer(socket.getReceiveBufferSize(), true); + }catch ( IOException x ) { + //unable to get buffer size + log.warn("Unable to retrieve the socket receiver buffer size, setting to default 43800 bytes."); + this.buffer = new XByteBuffer(43800,true); + } + this.callback = callback; + } /** * get the current SimpleTcpCluster @@ -125,6 +136,10 @@ return pkgCnt; } + public int bufferSize() { + return buffer.getLength(); + } + /** * Returns the number of packages that the reader has read * @return int @@ -141,6 +156,12 @@ */ public int write(ByteBuffer buf) throws java.io.IOException { return getChannel().write(buf); + } + + public void close() { + this.callback = null; + this.channel = null; + this.buffer = null; } } Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java?rev=385839&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java (added) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java Tue Mar 14 09:22:04 2006 @@ -0,0 +1,306 @@ +/* + * Copyright 1999,2006 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.catalina.tribes.tcp.bio; + +import java.io.IOException; + +import org.apache.catalina.tribes.ChannelReceiver; +import org.apache.catalina.tribes.MessageListener; +import java.net.InetAddress; +import org.apache.catalina.tribes.tcp.nio.ThreadPool; +import java.net.ServerSocket; +import java.net.InetSocketAddress; +import org.apache.catalina.tribes.io.ListenCallback; +import org.apache.catalina.tribes.ChannelMessage; +import java.net.Socket; +import org.apache.catalina.tribes.io.ObjectReader; + +/** + * <p>Title: </p> + * + * <p>Description: </p> + * + * <p>Copyright: Copyright (c) 2005</p> + * + * <p>Company: </p> + * + * @author not attributable + * @version 1.0 + */ +public class BioReceiver implements Runnable, ChannelReceiver, ListenCallback { + + protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(BioReceiver.class); + + + protected MessageListener listener; + protected String host; + protected InetAddress bind; + protected int port; + protected boolean sendack; + protected boolean sync; + protected int rxBufSize = 43800; + protected int txBufSize = 25188; + protected int tcpThreadCount; + protected ServerSocket serverSocket; + protected boolean doRun = true; + + protected ThreadPool pool; + + public BioReceiver() { + } + + /** + * + * @return The host + * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method + */ + public String getHost() { + return host; + } + + /** + * getMessageListener + * + * @return MessageListener + * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method + */ + public MessageListener getMessageListener() { + return listener; + } + + /** + * + * @return The port + * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method + */ + public int getPort() { + return port; + } + + public int getRxBufSize() { + return rxBufSize; + } + + public int getTxBufSize() { + return txBufSize; + } + + public int getTcpThreadCount() { + return tcpThreadCount; + } + + /** + * + * @return boolean + * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method + */ + public boolean getSendAck() { + return sendack; + } + + /** + * setMessageListener + * + * @param listener MessageListener + * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method + */ + public void setMessageListener(MessageListener listener) { + this.listener = listener; + } + + + /** + * + * @param isSendAck boolean + * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method + */ + public void setSendAck(boolean sendAck) { + this.sendack = sendAck; + } + + /** + * + * @throws IOException + * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method + */ + public void start() throws IOException { + this.doRun = true; + try { + TcpReplicationThread[] receivers = new TcpReplicationThread[tcpThreadCount]; + for ( int i=0; i<receivers.length; i++ ) { + receivers[i] = getReplicationThread(); + } + pool = new ThreadPool(new Object(), receivers); + } catch (Exception e) { + log.error("ThreadPool can initilzed. Listener not started", e); + return; + } + try { + getBind(); + bind(); + Thread t = new Thread(this, "BioReceiver"); + t.setDaemon(true); + t.start(); + } catch (Exception x) { + log.fatal("Unable to start cluster receiver", x); + } + } + + protected TcpReplicationThread getReplicationThread() { + TcpReplicationThread result = new TcpReplicationThread(); + result.setOptions(getWorkerThreadOptions()); + return result; + } + + /** + * + * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method + */ + public void stop() { + this.doRun = false; + try { + this.serverSocket.close(); + }catch ( Exception x ) {} + } + + public void setTcpListenPort(int tcpListenPort) { + this.port = tcpListenPort; + } + + public void setTcpListenAddress(String tcpListenHost) { + this.host = tcpListenHost; + } + + public void setSynchronized(boolean sync) { + this.sync = sync; + } + + public void setRxBufSize(int rxBufSize) { + this.rxBufSize = rxBufSize; + } + + public void setTxBufSize(int txBufSize) { + this.txBufSize = txBufSize; + } + + public void setTcpThreadCount(int tcpThreadCount) { + this.tcpThreadCount = tcpThreadCount; + } + + public void setTcpSelectorTimeout(long timeout) { + //do nothing + } + + /** + * @return Returns the bind. + */ + public InetAddress getBind() { + if (bind == null) { + try { + if ("auto".equals(host)) { + host = java.net.InetAddress.getLocalHost().getHostAddress(); + } + if (log.isDebugEnabled()) + log.debug("Starting replication listener on address:"+ host); + bind = java.net.InetAddress.getByName(host); + } catch (IOException ioe) { + log.error("Failed bind replication listener on address:"+ host, ioe); + } + } + return bind; + } + + protected int bind(ServerSocket socket, int portstart, int retries) throws IOException { + while ( retries > 0 ) { + try { + InetSocketAddress addr = new InetSocketAddress(getBind(), portstart); + socket.bind(addr); + setTcpListenPort(portstart); + log.info("Bio Server Socket bound to:"+addr); + return 0; + }catch ( IOException x) { + retries--; + if ( retries <= 0 ) throw x; + portstart++; + retries = bind(socket,portstart,retries); + } + } + return retries; + } + + + + protected void bind() throws IOException { + // allocate an unbound server socket channel + serverSocket = new ServerSocket(); + // set the port the server channel will listen to + //serverSocket.bind(new InetSocketAddress(getBind(), getTcpListenPort())); + bind(serverSocket,getPort(),10); + } + + public void messageDataReceived(ChannelMessage data) { + if ( this.listener != null ) { + listener.messageReceived(data); + } + } + + public void run() { + try { + listen(); + } catch (Exception x) { + log.error("Unable to run replication listener.", x); + } + + } + + public void listen() throws Exception { + while ( doRun ) { + Socket socket = null; + try { + socket = serverSocket.accept(); + }catch ( Exception x ) { + if ( doRun ) throw x; + } + if ( !doRun ) break; + if ( socket == null ) continue; + socket.setReceiveBufferSize(rxBufSize); + socket.setSendBufferSize(txBufSize); + TcpReplicationThread thread = (TcpReplicationThread)pool.getWorker(); + ObjectReader reader = new ObjectReader(socket,this); + + if ( thread == null ) { + //we are out of workers, process the request on the listening thread + thread = getReplicationThread(); + thread.socket = socket; + thread.reader = reader; + thread.run(); + } else { + thread.serviceSocket(socket,reader); + }//end if + }//while + } + + public int getWorkerThreadOptions() { + int options = 0; + if ( sync ) options = options |TcpReplicationThread.OPTION_SYNCHRONIZED; + if ( getSendAck() ) options = options |TcpReplicationThread.OPTION_SEND_ACK; + return options; + } + + + + +} \ No newline at end of file Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java?rev=385839&r1=385838&r2=385839&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java Tue Mar 14 09:22:04 2006 @@ -1,5 +1,5 @@ /* - * Copyright 1999,2005 The Apache Software Foundation. + * Copyright 1999,2006 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java?rev=385839&r1=385838&r2=385839&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java Tue Mar 14 09:22:04 2006 @@ -33,7 +33,7 @@ protected long selectTimeout = 1000; protected boolean waitForAck = false; protected int retryAttempts=0; - protected int keepAliveCount = Integer.MAX_VALUE; + protected int keepAliveCount = -1; protected HashMap bioSenders = new HashMap(); protected boolean directBuf = false; protected int rxBufSize = 43800; Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java?rev=385839&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java (added) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java Tue Mar 14 09:22:04 2006 @@ -0,0 +1,173 @@ +/* + * Copyright 1999,2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.catalina.tribes.tcp.bio; +import java.io.IOException; + +import org.apache.catalina.tribes.io.ObjectReader; +import org.apache.catalina.tribes.tcp.Constants; +import org.apache.catalina.tribes.tcp.nio.WorkerThread; +import java.net.Socket; +import java.io.InputStream; + +/** + * A worker thread class which can drain channels and echo-back the input. Each + * instance is constructed with a reference to the owning thread pool object. + * When started, the thread loops forever waiting to be awakened to service the + * channel associated with a SelectionKey object. The worker is tasked by + * calling its serviceChannel() method with a SelectionKey object. The + * serviceChannel() method stores the key reference in the thread object then + * calls notify() to wake it up. When the channel has been drained, the worker + * thread returns itself to its parent pool. + * + * @author Filip Hanik + * + * @version $Revision: 378050 $, $Date: 2006-02-15 12:30:02 -0600 (Wed, 15 Feb 2006) $ + */ +public class TcpReplicationThread extends WorkerThread { + public static final int OPTION_SEND_ACK = 0x0001; + public static final int OPTION_SYNCHRONIZED = 0x0002; + public static final int OPTION_DIRECT_BUFFER = 0x0004; + + + protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( TcpReplicationThread.class ); + + protected Socket socket; + protected ObjectReader reader; + + public TcpReplicationThread () + { + } + + // loop forever waiting for work to do + public synchronized void run() + { + while (doRun) { + try { + // sleep and release object lock + this.wait(); + } catch (InterruptedException e) { + if(log.isInfoEnabled()) + log.info("TCP worker thread interrupted in cluster",e); + // clear interrupt status + Thread.interrupted(); + } + if ( this.socket != null ) { + try { + drainSocket(); + } catch ( Exception x ) { + log.error("Unable to service bio socket"); + }finally { + try {reader.close();}catch ( Exception x){} + try {socket.close();}catch ( Exception x){} + reader = null; + socket = null; + } + } + // done, ready for more, return to pool + if ( this.pool != null ) this.pool.returnWorker (this); + else doRun = false; + } + } + + + public synchronized void serviceSocket(Socket socket, ObjectReader reader) { + this.socket = socket; + this.reader = reader; + this.notify(); // awaken the thread + } + + protected void execute(ObjectReader reader) throws Exception{ + int pkgcnt = reader.count(); + /** + * Use send ack here if you want to ack the request to the remote + * server before completing the request + * This is considered an asynchronized request + */ + if (sendAckAsync()) { + while ( pkgcnt > 0 ) { + sendAck(); + pkgcnt--; + } + } + //check to see if any data is available + pkgcnt = reader.execute(); + + /** + * Use send ack here if you want the request to complete on this + * server before sending the ack to the remote server + * This is considered a synchronized request + */ + if (sendAckSync()) { + while ( pkgcnt > 0 ) { + sendAck(); + pkgcnt--; + } + } + + } + + /** + * The actual code which drains the channel associated with + * the given key. This method assumes the key has been + * modified prior to invocation to turn off selection + * interest in OP_READ. When this method completes it + * re-enables OP_READ and calls wakeup() on the selector + * so the selector will resume watching this channel. + */ + protected void drainSocket () throws Exception { + InputStream in = socket.getInputStream(); + // loop while data available, channel is non-blocking + byte[] buf = new byte[1024]; + int length = in.read(buf); + while ( length >= 0 ) { + int count = reader.append(buf,0,length,true); + if ( count > 0 ) execute(reader); + if ( in.available() == 0 && reader.bufferSize() == 0 ) length = -1; + else length = in.read(buf); + } + } + + + public boolean sendAckSync() { + int options = getOptions(); + return ((OPTION_SEND_ACK & options) == OPTION_SEND_ACK) && + ((OPTION_SYNCHRONIZED & options) == OPTION_SYNCHRONIZED); + } + + public boolean sendAckAsync() { + int options = getOptions(); + return ((OPTION_SEND_ACK & options) == OPTION_SEND_ACK) && + ((OPTION_SYNCHRONIZED & options) != OPTION_SYNCHRONIZED); + } + + + /** + * send a reply-acknowledgement (6,2,3) + * @param key + * @param channel + */ + protected void sendAck() { + try { + this.socket.getOutputStream().write(Constants.ACK_COMMAND); + if (log.isTraceEnabled()) { + log.trace("ACK sent to " + socket.getPort()); + } + } catch ( java.io.IOException x ) { + log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage()); + } + } +} Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java?rev=385839&r1=385838&r2=385839&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java Tue Mar 14 09:22:04 2006 @@ -136,7 +136,7 @@ } try { getBind(); - this.bind(); + bind(); Thread t = new Thread(this, "NioReceiver"); t.setDaemon(true); t.start(); @@ -310,7 +310,7 @@ try { listen(); } catch (Exception x) { - log.error("Unable to start replication listener.", x); + log.error("Unable to run replication listener.", x); } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ThreadPool.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ThreadPool.java?rev=385839&r1=385838&r2=385839&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ThreadPool.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ThreadPool.java Tue Mar 14 09:22:04 2006 @@ -17,6 +17,7 @@ package org.apache.catalina.tribes.tcp.nio; import java.util.LinkedList; import java.util.List; +import java.util.Iterator; /** * @author not attributable @@ -34,6 +35,7 @@ List idle = new LinkedList(); Object mutex = new Object(); Object interestOpsMutex = null; + boolean running = true; public ThreadPool (Object interestOpsMutex, WorkerThread[] threads) throws Exception { // fill up the pool with worker threads @@ -98,12 +100,29 @@ * idle pool. */ public void returnWorker (WorkerThread worker) { - synchronized (mutex) { - idle.add (worker); - mutex.notify(); + if ( running ) { + synchronized (mutex) { + idle.add(worker); + mutex.notify(); + } + }else { + worker.doRun = false; + synchronized (worker){worker.notify();} } } public Object getInterestOpsMutex() { return interestOpsMutex; + } + + public void stop() { + running = false; + synchronized (mutex) { + Iterator i = idle.iterator(); + while ( i.hasNext() ) { + WorkerThread worker = (WorkerThread)i.next(); + returnWorker(worker); + i.remove(); + } + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]