Author: fhanik Date: Tue Mar 14 10:41:41 2006 New Revision: 385849 URL: http://svn.apache.org/viewcvs?rev=385849&view=rev Log: Refactored receivers both NIO and blocking IO into a receiver base
Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/TcpReplicationThread.java tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java?rev=385849&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java (added) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java Tue Mar 14 10:41:41 2006 @@ -0,0 +1,263 @@ +/* + * Copyright 1999,2006 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.catalina.tribes.tcp; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; + +import org.apache.catalina.tribes.ChannelMessage; +import org.apache.catalina.tribes.ChannelReceiver; +import org.apache.catalina.tribes.MessageListener; +import org.apache.catalina.tribes.io.ListenCallback; +import org.apache.catalina.tribes.tcp.nio.ThreadPool; + +/** + * <p>Title: </p> + * + * <p>Description: </p> + * + * <p>Copyright: Copyright (c) 2005</p> + * + * <p>Company: </p> + * + * @author not attributable + * @version 1.0 + */ +public abstract class ReceiverBase implements ChannelReceiver, ListenCallback { + + public static final int OPTION_SEND_ACK = 0x0001; + public static final int OPTION_SYNCHRONIZED = 0x0002; + public static final int OPTION_DIRECT_BUFFER = 0x0004; + + + protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(ReceiverBase.class); + + protected MessageListener listener; + protected String host; + protected InetAddress bind; + protected int port; + protected boolean sendack; + protected boolean sync; + protected int rxBufSize = 43800; + protected int txBufSize = 25188; + protected int tcpThreadCount; + protected boolean doListen = false; + protected ThreadPool pool; + protected boolean direct = true; + protected long tcpSelectorTimeout; + + public ReceiverBase() { + } + + /** + * getMessageListener + * + * @return MessageListener + * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method + */ + public MessageListener getMessageListener() { + return listener; + } + + /** + * + * @return The port + * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method + */ + public int getPort() { + return port; + } + + public int getRxBufSize() { + return rxBufSize; + } + + public int getTxBufSize() { + return txBufSize; + } + + public int getTcpThreadCount() { + return tcpThreadCount; + } + + /** + * + * @return boolean + * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method + */ + public boolean getSendAck() { + return sendack; + } + + /** + * setMessageListener + * + * @param listener MessageListener + * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method + */ + public void setMessageListener(MessageListener listener) { + this.listener = listener; + } + + + /** + * + * @param isSendAck boolean + * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method + */ + public void setSendAck(boolean sendAck) { + this.sendack = sendAck; + } + public void setTcpListenPort(int tcpListenPort) { + this.port = tcpListenPort; + } + + public void setTcpListenAddress(String tcpListenHost) { + this.host = tcpListenHost; + } + + public void setSynchronized(boolean sync) { + this.sync = sync; + } + + public void setRxBufSize(int rxBufSize) { + this.rxBufSize = rxBufSize; + } + + public void setTxBufSize(int txBufSize) { + this.txBufSize = txBufSize; + } + + public void setTcpThreadCount(int tcpThreadCount) { + this.tcpThreadCount = tcpThreadCount; + } + + /** + * @return Returns the bind. + */ + public InetAddress getBind() { + if (bind == null) { + try { + if ("auto".equals(host)) { + host = java.net.InetAddress.getLocalHost().getHostAddress(); + } + if (log.isDebugEnabled()) + log.debug("Starting replication listener on address:"+ host); + bind = java.net.InetAddress.getByName(host); + } catch (IOException ioe) { + log.error("Failed bind replication listener on address:"+ host, ioe); + } + } + return bind; + } + + /** + * recursive bind to find the next available port + * @param socket ServerSocket + * @param portstart int + * @param retries int + * @return int + * @throws IOException + */ + protected int bind(ServerSocket socket, int portstart, int retries) throws IOException { + while ( retries > 0 ) { + try { + InetSocketAddress addr = new InetSocketAddress(getBind(), portstart); + socket.bind(addr); + setTcpListenPort(portstart); + log.info("Nio Server Socket bound to:"+addr); + return 0; + }catch ( IOException x) { + retries--; + if ( retries <= 0 ) throw x; + portstart++; + retries = bind(socket,portstart,retries); + } + } + return retries; + } + + public void messageDataReceived(ChannelMessage data) { + if ( this.listener != null ) { + listener.messageReceived(data); + } + } + + public int getWorkerThreadOptions() { + int options = 0; + if ( getSynchronized() ) options = options |OPTION_SYNCHRONIZED; + if ( getSendAck() ) options = options |OPTION_SEND_ACK; + if ( getDirect() ) options = options | OPTION_DIRECT_BUFFER; + return options; + } + + + /** + * @param bind The bind to set. + */ + public void setBind(java.net.InetAddress bind) { + this.bind = bind; + } + + + public int getTcpListenPort() { + return this.port; + } + + public boolean isSync() { + return sync; + } + + public boolean getDirect() { + return direct; + } + + + + public void setDirect(boolean direct) { + this.direct = direct; + } + + + public boolean getSynchronized() { + return this.sync; + } + + + + public String getHost() { + getBind(); + return this.host; + } + + public long getTcpSelectorTimeout() { + return tcpSelectorTimeout; + } + + public void setTcpSelectorTimeout(long selTimeout) { + tcpSelectorTimeout = selTimeout; + } + /* (non-Javadoc) + * @see org.apache.catalina.tribes.io.ListenCallback#sendAck() + */ + public void sendAck() throws IOException { + // do nothing + } + + +} \ No newline at end of file Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java?rev=385849&r1=385848&r2=385849&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java Tue Mar 14 10:41:41 2006 @@ -27,6 +27,7 @@ import org.apache.catalina.tribes.ChannelMessage; import java.net.Socket; import org.apache.catalina.tribes.io.ObjectReader; +import org.apache.catalina.tribes.tcp.ReceiverBase; /** * <p>Title: </p> @@ -40,104 +41,21 @@ * @author not attributable * @version 1.0 */ -public class BioReceiver implements Runnable, ChannelReceiver, ListenCallback { +public class BioReceiver extends ReceiverBase implements Runnable, ChannelReceiver, ListenCallback { protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(BioReceiver.class); - - protected MessageListener listener; - protected String host; - protected InetAddress bind; - protected int port; - protected boolean sendack; - protected boolean sync; - protected int rxBufSize = 43800; - protected int txBufSize = 25188; - protected int tcpThreadCount; protected ServerSocket serverSocket; - protected boolean doRun = true; - - protected ThreadPool pool; public BioReceiver() { } /** * - * @return The host - * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method - */ - public String getHost() { - return host; - } - - /** - * getMessageListener - * - * @return MessageListener - * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method - */ - public MessageListener getMessageListener() { - return listener; - } - - /** - * - * @return The port - * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method - */ - public int getPort() { - return port; - } - - public int getRxBufSize() { - return rxBufSize; - } - - public int getTxBufSize() { - return txBufSize; - } - - public int getTcpThreadCount() { - return tcpThreadCount; - } - - /** - * - * @return boolean - * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method - */ - public boolean getSendAck() { - return sendack; - } - - /** - * setMessageListener - * - * @param listener MessageListener - * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method - */ - public void setMessageListener(MessageListener listener) { - this.listener = listener; - } - - - /** - * - * @param isSendAck boolean - * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method - */ - public void setSendAck(boolean sendAck) { - this.sendack = sendAck; - } - - /** - * * @throws IOException * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method */ public void start() throws IOException { - this.doRun = true; try { TcpReplicationThread[] receivers = new TcpReplicationThread[tcpThreadCount]; for ( int i=0; i<receivers.length; i++ ) { @@ -170,78 +88,14 @@ * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method */ public void stop() { - this.doRun = false; + this.doListen = false; try { this.serverSocket.close(); }catch ( Exception x ) {} } - public void setTcpListenPort(int tcpListenPort) { - this.port = tcpListenPort; - } - - public void setTcpListenAddress(String tcpListenHost) { - this.host = tcpListenHost; - } - - public void setSynchronized(boolean sync) { - this.sync = sync; - } - - public void setRxBufSize(int rxBufSize) { - this.rxBufSize = rxBufSize; - } - - public void setTxBufSize(int txBufSize) { - this.txBufSize = txBufSize; - } - - public void setTcpThreadCount(int tcpThreadCount) { - this.tcpThreadCount = tcpThreadCount; - } - - public void setTcpSelectorTimeout(long timeout) { - //do nothing - } - /** - * @return Returns the bind. - */ - public InetAddress getBind() { - if (bind == null) { - try { - if ("auto".equals(host)) { - host = java.net.InetAddress.getLocalHost().getHostAddress(); - } - if (log.isDebugEnabled()) - log.debug("Starting replication listener on address:"+ host); - bind = java.net.InetAddress.getByName(host); - } catch (IOException ioe) { - log.error("Failed bind replication listener on address:"+ host, ioe); - } - } - return bind; - } - protected int bind(ServerSocket socket, int portstart, int retries) throws IOException { - while ( retries > 0 ) { - try { - InetSocketAddress addr = new InetSocketAddress(getBind(), portstart); - socket.bind(addr); - setTcpListenPort(portstart); - log.info("Bio Server Socket bound to:"+addr); - return 0; - }catch ( IOException x) { - retries--; - if ( retries <= 0 ) throw x; - portstart++; - retries = bind(socket,portstart,retries); - } - } - return retries; - } - - protected void bind() throws IOException { // allocate an unbound server socket channel @@ -251,11 +105,7 @@ bind(serverSocket,getPort(),10); } - public void messageDataReceived(ChannelMessage data) { - if ( this.listener != null ) { - listener.messageReceived(data); - } - } + public void run() { try { @@ -267,14 +117,20 @@ } public void listen() throws Exception { - while ( doRun ) { + if (doListen) { + log.warn("ServerSocket already started"); + return; + } + doListen = true; + + while ( doListen ) { Socket socket = null; try { socket = serverSocket.accept(); }catch ( Exception x ) { - if ( doRun ) throw x; + if ( doListen ) throw x; } - if ( !doRun ) break; + if ( !doListen ) break; //regular shutdown if ( socket == null ) continue; socket.setReceiveBufferSize(rxBufSize); socket.setSendBufferSize(txBufSize); @@ -293,14 +149,5 @@ }//while } - public int getWorkerThreadOptions() { - int options = 0; - if ( sync ) options = options |TcpReplicationThread.OPTION_SYNCHRONIZED; - if ( getSendAck() ) options = options |TcpReplicationThread.OPTION_SEND_ACK; - return options; - } - - - } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java?rev=385849&r1=385848&r2=385849&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java Tue Mar 14 10:41:41 2006 @@ -22,6 +22,7 @@ import org.apache.catalina.tribes.tcp.nio.WorkerThread; import java.net.Socket; import java.io.InputStream; +import org.apache.catalina.tribes.tcp.ReceiverBase; /** * A worker thread class which can drain channels and echo-back the input. Each @@ -38,9 +39,9 @@ * @version $Revision: 378050 $, $Date: 2006-02-15 12:30:02 -0600 (Wed, 15 Feb 2006) $ */ public class TcpReplicationThread extends WorkerThread { - public static final int OPTION_SEND_ACK = 0x0001; - public static final int OPTION_SYNCHRONIZED = 0x0002; - public static final int OPTION_DIRECT_BUFFER = 0x0004; + public static final int OPTION_SEND_ACK = ReceiverBase.OPTION_SEND_ACK; + public static final int OPTION_SYNCHRONIZED = ReceiverBase.OPTION_SYNCHRONIZED; + public static final int OPTION_DIRECT_BUFFER = ReceiverBase.OPTION_DIRECT_BUFFER; protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( TcpReplicationThread.class ); Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java?rev=385849&r1=385848&r2=385849&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java Tue Mar 14 10:41:41 2006 @@ -33,20 +33,13 @@ import org.apache.catalina.tribes.io.ObjectReader; import org.apache.catalina.tribes.tcp.Constants; import org.apache.catalina.util.StringManager; +import org.apache.catalina.tribes.tcp.ReceiverBase; /** * @author Filip Hanik * @version $Revision: 379904 $ $Date: 2006-02-22 15:16:25 -0600 (Wed, 22 Feb 2006) $ */ -public class NioReceiver implements Runnable, ChannelReceiver, ListenCallback { - /** - * @todo make this configurable - */ - protected int rxBufSize = 43800; - /** - * We are only sending acks - */ - protected int txBufSize = 25188; +public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiver, ListenCallback { protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(NioReceiver.class); @@ -60,24 +53,12 @@ */ private static final String info = "NioReceiver/1.0"; - private ThreadPool pool = null; - private int tcpThreadCount; - private long tcpSelectorTimeout; private Selector selector = null; private ServerSocketChannel serverChannel = null; - private java.net.InetAddress bind; - private String tcpListenAddress; - private int tcpListenPort; - private boolean sendAck; - protected boolean doListen = false; - - private Object interestOpsMutex = new Object(); - private MessageListener listener = null; - private boolean sync; - private boolean direct; + public NioReceiver() { } @@ -87,25 +68,7 @@ * <code><description>/<version></code>. */ public String getInfo() { - return (info); - - } - - public long getTcpSelectorTimeout() { - return tcpSelectorTimeout; - } - - public void setTcpSelectorTimeout(long tcpSelectorTimeout) { - this.tcpSelectorTimeout = tcpSelectorTimeout; - } - - public int getTcpThreadCount() { - return tcpThreadCount; - } - - public void setTcpThreadCount(int tcpThreadCount) { - this.tcpThreadCount = tcpThreadCount; } public Object getInterestOpsMutex() { @@ -145,31 +108,7 @@ } } - /** - * recursive bind to find the next available port - * @param socket ServerSocket - * @param portstart int - * @param retries int - * @return int - * @throws IOException - */ - protected int bind(ServerSocket socket, int portstart, int retries) throws IOException { - while ( retries > 0 ) { - try { - InetSocketAddress addr = new InetSocketAddress(getBind(), portstart); - socket.bind(addr); - setTcpListenPort(portstart); - log.info("Nio Server Socket bound to:"+addr); - return 0; - }catch ( IOException x) { - retries--; - if ( retries <= 0 ) throw x; - portstart++; - retries = bind(socket,portstart,retries); - } - } - return retries; - } + protected void bind() throws IOException { // allocate an unbound server socket channel @@ -195,7 +134,7 @@ */ protected void listen() throws Exception { if (doListen) { - log.warn("ServerSocketChannel allready started"); + log.warn("ServerSocketChannel already started"); return; } @@ -206,7 +145,7 @@ // selected set contains keys of the ready channels try { - int n = selector.select(tcpSelectorTimeout); + int n = selector.select(getTcpSelectorTimeout()); if (n == 0) { //there is a good chance that we got here //because the TcpReplicationThread called @@ -339,139 +278,5 @@ } } - public void messageDataReceived(ChannelMessage data) { - if ( this.listener != null ) { - listener.messageReceived(data); - } - } - - /** - * @return Returns the bind. - */ - public java.net.InetAddress getBind() { - if (bind == null) { - try { - if ("auto".equals(tcpListenAddress)) { - tcpListenAddress = java.net.InetAddress.getLocalHost() - .getHostAddress(); - } - if (log.isDebugEnabled()) - log.debug("Starting replication listener on address:"+ tcpListenAddress); - bind = java.net.InetAddress.getByName(tcpListenAddress); - } catch (IOException ioe) { - log.error("Failed bind replication listener on address:"+ tcpListenAddress, ioe); - } - } - return bind; - } - - /** - * @param bind The bind to set. - */ - public void setBind(java.net.InetAddress bind) { - this.bind = bind; - } - - /** - * Send ACK to sender - * - * @return True if sending ACK - */ - public boolean getSendAck() { - return sendAck; - } - - /** - * set ack mode or not! - * - * @param sendAck - */ - public void setSendAck(boolean sendAck) { - this.sendAck = sendAck; - } - - public String getTcpListenAddress() { - return tcpListenAddress; - } - - public void setTcpListenAddress(String tcpListenAddress) { - this.tcpListenAddress = tcpListenAddress; - } - - public int getTcpListenPort() { - return tcpListenPort; - } - - public boolean isSync() { - return sync; - } - - public boolean getDirect() { - return direct; - } - - public int getRxBufSize() { - return rxBufSize; - } - - public int getTxBufSize() { - return txBufSize; - } - - public MessageListener getMessageListener() { - return listener; - } - - public void setTcpListenPort(int tcpListenPort) { - this.tcpListenPort = tcpListenPort; - } - - public void setDirect(boolean direct) { - this.direct = direct; - } - - public void setRxBufSize(int rxBufSize) { - this.rxBufSize = rxBufSize; - } - - public void setTxBufSize(int txBufSize) { - this.txBufSize = txBufSize; - } - - public void setSynchronized(boolean sync) { - this.sync = sync; - } - - public boolean getSynchronized() { - return this.sync; - } - - public int getWorkerThreadOptions() { - int options = 0; - if ( getSynchronized() ) options = options |TcpReplicationThread.OPTION_SYNCHRONIZED; - if ( getSendAck() ) options = options |TcpReplicationThread.OPTION_SEND_ACK; - if ( getDirect() ) options = options | TcpReplicationThread.OPTION_DIRECT_BUFFER; - return options; - } - - public void setMessageListener(MessageListener listener) { - this.listener = listener; - } - - public String getHost() { - getBind(); - return getTcpListenAddress(); - } - - public int getPort() { - return getTcpListenPort(); - } - - /* (non-Javadoc) - * @see org.apache.catalina.tribes.io.ListenCallback#sendAck() - */ - public void sendAck() throws IOException { - // do nothing - } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/TcpReplicationThread.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/TcpReplicationThread.java?rev=385849&r1=385848&r2=385849&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/TcpReplicationThread.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/TcpReplicationThread.java Tue Mar 14 10:41:41 2006 @@ -22,6 +22,7 @@ import org.apache.catalina.tribes.io.ObjectReader; import org.apache.catalina.tribes.tcp.Constants; +import org.apache.catalina.tribes.tcp.ReceiverBase; /** * A worker thread class which can drain channels and echo-back the input. Each @@ -38,9 +39,10 @@ * @version $Revision: 378050 $, $Date: 2006-02-15 12:30:02 -0600 (Wed, 15 Feb 2006) $ */ public class TcpReplicationThread extends WorkerThread { - public static final int OPTION_SEND_ACK = 0x0001; - public static final int OPTION_SYNCHRONIZED = 0x0002; - public static final int OPTION_DIRECT_BUFFER = 0x0004; + public static final int OPTION_SEND_ACK = ReceiverBase.OPTION_SEND_ACK; + public static final int OPTION_SYNCHRONIZED = ReceiverBase.OPTION_SYNCHRONIZED; + public static final int OPTION_DIRECT_BUFFER = ReceiverBase.OPTION_DIRECT_BUFFER; + private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( TcpReplicationThread.class ); Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java?rev=385849&r1=385848&r2=385849&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java Tue Mar 14 10:41:41 2006 @@ -28,7 +28,9 @@ import org.apache.catalina.tribes.tcp.MultiPointSender; import org.apache.catalina.tribes.tcp.ReplicationTransmitter; import org.apache.catalina.tribes.tcp.nio.NioReceiver; +import org.apache.catalina.tribes.tcp.bio.BioReceiver; import org.apache.tomcat.util.IntrospectionUtils; +import org.apache.catalina.tribes.tcp.ReceiverBase; /** * <p>Title: </p> @@ -53,7 +55,8 @@ .append("\n\t\t[-ackto acktimeout]") .append("\n\t\t[-autoconnect true|false]") .append("\n\t\t[-sync true|false]") - .append("\n\t\t[-transport org.apache.catalina.tribes.tcp.nio.ParallelNioSender]") + .append("\n\t\t[-receiver org.apache.catalina.tribes.tcp.nio.NioReceiver|org.apache.catalina.tribes.tcp.bio.BioReceiver|]") + .append("\n\t\t[-transport org.apache.catalina.tribes.tcp.nio.PooledParallelSender|org.apache.catalina.tribes.tcp.bio.PooledMultipointSender]") .append("\n\t\t[-transport.xxx transport specific property]") .append("\n\t\t[-maddr multicastaddr]") .append("\n\t\t[-mport multicastport]") @@ -90,6 +93,7 @@ int fragsize = 1024; Properties transportProperties = new Properties(); String transport = "org.apache.catalina.tribes.tcp.nio.PooledParallelSender"; + String receiver = "org.apache.catalina.tribes.tcp.nio.NioReceiver"; for (int i = 0; i < args.length; i++) { if ("-bind".equals(args[i])) { @@ -126,6 +130,8 @@ String key = args[i]; String val = args[++i]; transportProperties.setProperty(key,val); + } else if ("-receiver".equals(args[i])) { + receiver = args[++i]; } else if ("-maddr".equals(args[i])) { mcastaddr = args[++i]; } else if ("-mport".equals(args[i])) { @@ -138,15 +144,19 @@ mbind = args[++i]; } } - - NioReceiver rl = new NioReceiver(); - rl.setTcpListenAddress(bind); - rl.setTcpListenPort(port); - rl.setTcpSelectorTimeout(tcpseltimeout); - rl.setTcpThreadCount(tcpthreadcount); - rl.getBind(); - rl.setSendAck(ack); - rl.setSynchronized(sync); + + System.out.println("Creating receiver class="+receiver); + Class cl = Class.forName(receiver,true,ChannelCreator.class.getClassLoader()); + ReceiverBase rx = (ReceiverBase)cl.newInstance(); + rx.setTcpListenAddress(bind); + rx.setTcpListenPort(port); + rx.setTcpSelectorTimeout(tcpseltimeout); + rx.setTcpThreadCount(tcpthreadcount); + rx.getBind(); + rx.setSendAck(ack); + rx.setSynchronized(sync); + rx.setRxBufSize(43800); + rx.setTxBufSize(25188); ReplicationTransmitter ps = new ReplicationTransmitter(); @@ -174,7 +184,7 @@ service.setMcastPort(mcastport); ManagedChannel channel = new GroupChannel(); - channel.setChannelReceiver(rl); + channel.setChannelReceiver(rx); channel.setChannelSender(ps); channel.setMembershipService(service); --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]