Author: fhanik
Date: Tue Mar 14 10:41:41 2006
New Revision: 385849

URL: http://svn.apache.org/viewcvs?rev=385849&view=rev
Log:
Refactored receivers both NIO and blocking IO into a receiver base

Added:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/TcpReplicationThread.java
    
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java

Added: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java?rev=385849&view=auto
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
 (added)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
 Tue Mar 14 10:41:41 2006
@@ -0,0 +1,263 @@
+/*
+ * Copyright 1999,2006 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.catalina.tribes.tcp;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.ChannelReceiver;
+import org.apache.catalina.tribes.MessageListener;
+import org.apache.catalina.tribes.io.ListenCallback;
+import org.apache.catalina.tribes.tcp.nio.ThreadPool;
+
+/**
+ * <p>Title: </p>
+ *
+ * <p>Description: </p>
+ *
+ * <p>Copyright: Copyright (c) 2005</p>
+ *
+ * <p>Company: </p>
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public abstract class ReceiverBase implements ChannelReceiver, ListenCallback {
+
+    public static final int OPTION_SEND_ACK = 0x0001;
+    public static final int OPTION_SYNCHRONIZED = 0x0002;
+    public static final int OPTION_DIRECT_BUFFER = 0x0004;
+
+
+    protected static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory.getLog(ReceiverBase.class);
+    
+    protected MessageListener listener;
+    protected String host;
+    protected InetAddress bind;
+    protected int port;
+    protected boolean sendack;
+    protected boolean sync;
+    protected int rxBufSize = 43800;
+    protected int txBufSize = 25188;
+    protected int tcpThreadCount;
+    protected boolean doListen = false;
+    protected ThreadPool pool;
+    protected boolean direct = true;
+    protected long tcpSelectorTimeout;
+
+    public ReceiverBase() {
+    }
+    
+    /**
+     * getMessageListener
+     *
+     * @return MessageListener
+     * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
+     */
+    public MessageListener getMessageListener() {
+        return listener;
+    }
+
+    /**
+     *
+     * @return The port
+     * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
+     */
+    public int getPort() {
+        return port;
+    }
+
+    public int getRxBufSize() {
+        return rxBufSize;
+    }
+
+    public int getTxBufSize() {
+        return txBufSize;
+    }
+
+    public int getTcpThreadCount() {
+        return tcpThreadCount;
+    }
+
+    /**
+     *
+     * @return boolean
+     * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
+     */
+    public boolean getSendAck() {
+        return sendack;
+    }
+
+    /**
+     * setMessageListener
+     *
+     * @param listener MessageListener
+     * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
+     */
+    public void setMessageListener(MessageListener listener) {
+        this.listener = listener;
+    }
+
+
+    /**
+     *
+     * @param isSendAck boolean
+     * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
+     */
+    public void setSendAck(boolean sendAck) {
+        this.sendack = sendAck;
+    }
+    public void setTcpListenPort(int tcpListenPort) {
+        this.port = tcpListenPort;
+    }
+
+    public void setTcpListenAddress(String tcpListenHost) {
+        this.host = tcpListenHost;
+    }
+
+    public void setSynchronized(boolean sync) {
+        this.sync = sync;
+    }
+
+    public void setRxBufSize(int rxBufSize) {
+        this.rxBufSize = rxBufSize;
+    }
+
+    public void setTxBufSize(int txBufSize) {
+        this.txBufSize = txBufSize;
+    }
+
+    public void setTcpThreadCount(int tcpThreadCount) {
+        this.tcpThreadCount = tcpThreadCount;
+    }
+
+    /**
+     * @return Returns the bind.
+     */
+    public InetAddress getBind() {
+        if (bind == null) {
+            try {
+                if ("auto".equals(host)) {
+                    host = 
java.net.InetAddress.getLocalHost().getHostAddress();
+                }
+                if (log.isDebugEnabled())
+                    log.debug("Starting replication listener on address:"+ 
host);
+                bind = java.net.InetAddress.getByName(host);
+            } catch (IOException ioe) {
+                log.error("Failed bind replication listener on address:"+ 
host, ioe);
+            }
+        }
+        return bind;
+    }
+    
+    /**
+     * recursive bind to find the next available port
+     * @param socket ServerSocket
+     * @param portstart int
+     * @param retries int
+     * @return int
+     * @throws IOException
+     */
+    protected int bind(ServerSocket socket, int portstart, int retries) throws 
IOException {
+        while ( retries > 0 ) {
+            try {
+                InetSocketAddress addr = new InetSocketAddress(getBind(), 
portstart);
+                socket.bind(addr);
+                setTcpListenPort(portstart);
+                log.info("Nio Server Socket bound to:"+addr);
+                return 0;
+            }catch ( IOException x) {
+                retries--;
+                if ( retries <= 0 ) throw x;
+                portstart++;
+                retries = bind(socket,portstart,retries);
+            }
+        }
+        return retries;
+    }
+    
+    public void messageDataReceived(ChannelMessage data) {
+        if ( this.listener != null ) {
+            listener.messageReceived(data);
+        }
+    }
+    
+    public int getWorkerThreadOptions() {
+        int options = 0;
+        if ( getSynchronized() ) options = options |OPTION_SYNCHRONIZED;
+        if ( getSendAck() ) options = options |OPTION_SEND_ACK;
+        if ( getDirect() ) options = options | OPTION_DIRECT_BUFFER;
+        return options;
+    }
+
+
+    /**
+     * @param bind The bind to set.
+     */
+    public void setBind(java.net.InetAddress bind) {
+        this.bind = bind;
+    }
+
+
+    public int getTcpListenPort() {
+        return this.port;
+    }
+
+    public boolean isSync() {
+        return sync;
+    }
+
+    public boolean getDirect() {
+        return direct;
+    }
+
+
+
+    public void setDirect(boolean direct) {
+        this.direct = direct;
+    }
+
+
+    public boolean getSynchronized() {
+        return this.sync;
+    }
+
+
+
+    public String getHost() {
+        getBind();
+        return this.host;
+    }
+
+    public long getTcpSelectorTimeout() {
+        return tcpSelectorTimeout;
+    }
+
+    public void setTcpSelectorTimeout(long selTimeout) {
+        tcpSelectorTimeout = selTimeout;
+    }
+    /* (non-Javadoc)
+     * @see org.apache.catalina.tribes.io.ListenCallback#sendAck()
+     */
+    public void sendAck() throws IOException {
+        // do nothing
+    }
+
+
+}
\ No newline at end of file

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java?rev=385849&r1=385848&r2=385849&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
 Tue Mar 14 10:41:41 2006
@@ -27,6 +27,7 @@
 import org.apache.catalina.tribes.ChannelMessage;
 import java.net.Socket;
 import org.apache.catalina.tribes.io.ObjectReader;
+import org.apache.catalina.tribes.tcp.ReceiverBase;
 
 /**
  * <p>Title: </p>
@@ -40,104 +41,21 @@
  * @author not attributable
  * @version 1.0
  */
-public class BioReceiver implements Runnable, ChannelReceiver, ListenCallback {
+public class BioReceiver extends ReceiverBase implements Runnable, 
ChannelReceiver, ListenCallback {
 
     protected static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory.getLog(BioReceiver.class);
 
-
-    protected MessageListener listener;
-    protected String host;
-    protected InetAddress bind;
-    protected int port;
-    protected boolean sendack;
-    protected boolean sync;
-    protected int rxBufSize = 43800;
-    protected int txBufSize = 25188;    
-    protected int tcpThreadCount;
     protected ServerSocket serverSocket;
-    protected boolean doRun = true;
-    
-    protected ThreadPool pool;
 
     public BioReceiver() {
     }
 
     /**
      *
-     * @return The host
-     * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
-     */
-    public String getHost() {
-        return host;
-    }
-
-    /**
-     * getMessageListener
-     *
-     * @return MessageListener
-     * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
-     */
-    public MessageListener getMessageListener() {
-        return listener;
-    }
-
-    /**
-     *
-     * @return The port
-     * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
-     */
-    public int getPort() {
-        return port;
-    }
-
-    public int getRxBufSize() {
-        return rxBufSize;
-    }
-
-    public int getTxBufSize() {
-        return txBufSize;
-    }
-
-    public int getTcpThreadCount() {
-        return tcpThreadCount;
-    }
-
-    /**
-     *
-     * @return boolean
-     * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
-     */
-    public boolean getSendAck() {
-        return sendack;
-    }
-
-    /**
-     * setMessageListener
-     *
-     * @param listener MessageListener
-     * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
-     */
-    public void setMessageListener(MessageListener listener) {
-        this.listener = listener;
-    }
-    
-
-    /**
-     *
-     * @param isSendAck boolean
-     * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
-     */
-    public void setSendAck(boolean sendAck) {
-        this.sendack = sendAck;
-    }
-
-    /**
-     *
      * @throws IOException
      * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
      */
     public void start() throws IOException {
-        this.doRun = true;
         try {
             TcpReplicationThread[] receivers = new 
TcpReplicationThread[tcpThreadCount];
             for ( int i=0; i<receivers.length; i++ ) {
@@ -170,78 +88,14 @@
      * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
      */
     public void stop() {
-        this.doRun = false;
+        this.doListen = false;
         try {
             this.serverSocket.close();
         }catch ( Exception x ) {}
     }
 
-    public void setTcpListenPort(int tcpListenPort) {
-        this.port = tcpListenPort;
-    }
-
-    public void setTcpListenAddress(String tcpListenHost) {
-        this.host = tcpListenHost;
-    }
-
-    public void setSynchronized(boolean sync) {
-        this.sync = sync;
-    }
-
-    public void setRxBufSize(int rxBufSize) {
-        this.rxBufSize = rxBufSize;
-    }
-
-    public void setTxBufSize(int txBufSize) {
-        this.txBufSize = txBufSize;
-    }
-
-    public void setTcpThreadCount(int tcpThreadCount) {
-        this.tcpThreadCount = tcpThreadCount;
-    }
-    
-    public void setTcpSelectorTimeout(long timeout) {
-        //do nothing
-    }
     
-    /**
-     * @return Returns the bind.
-     */
-    public InetAddress getBind() {
-        if (bind == null) {
-            try {
-                if ("auto".equals(host)) {
-                    host = 
java.net.InetAddress.getLocalHost().getHostAddress();
-                }
-                if (log.isDebugEnabled())
-                    log.debug("Starting replication listener on address:"+ 
host);
-                bind = java.net.InetAddress.getByName(host);
-            } catch (IOException ioe) {
-                log.error("Failed bind replication listener on address:"+ 
host, ioe);
-            }
-        }
-        return bind;
-    }
     
-    protected int bind(ServerSocket socket, int portstart, int retries) throws 
IOException {
-        while ( retries > 0 ) {
-            try {
-                InetSocketAddress addr = new InetSocketAddress(getBind(), 
portstart);
-                socket.bind(addr);
-                setTcpListenPort(portstart);
-                log.info("Bio Server Socket bound to:"+addr);
-                return 0;
-            }catch ( IOException x) {
-                retries--;
-                if ( retries <= 0 ) throw x;
-                portstart++;
-                retries = bind(socket,portstart,retries);
-            }
-        }
-        return retries;
-    }
-
-
     
     protected void bind() throws IOException {
         // allocate an unbound server socket channel
@@ -251,11 +105,7 @@
         bind(serverSocket,getPort(),10);
     }
     
-    public void messageDataReceived(ChannelMessage data) {
-        if ( this.listener != null ) {
-            listener.messageReceived(data);
-        }
-    }
+    
     
     public void run() {
         try {
@@ -267,14 +117,20 @@
     }
     
     public void listen() throws Exception {
-        while ( doRun ) {
+        if (doListen) {
+            log.warn("ServerSocket already started");
+            return;
+        }
+        doListen = true;
+
+        while ( doListen ) {
             Socket socket = null;
             try {
                 socket = serverSocket.accept();
             }catch ( Exception x ) {
-                if ( doRun ) throw x;
+                if ( doListen ) throw x;
             }
-            if ( !doRun ) break;
+            if ( !doListen ) break; //regular shutdown
             if ( socket == null ) continue;
             socket.setReceiveBufferSize(rxBufSize);
             socket.setSendBufferSize(txBufSize);
@@ -293,14 +149,5 @@
         }//while
     }
     
-    public int getWorkerThreadOptions() {
-        int options = 0;
-        if ( sync ) options = options 
|TcpReplicationThread.OPTION_SYNCHRONIZED;
-        if ( getSendAck() ) options = options 
|TcpReplicationThread.OPTION_SEND_ACK;
-        return options;
-    }
-
-
-
 
 }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java?rev=385849&r1=385848&r2=385849&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java
 Tue Mar 14 10:41:41 2006
@@ -22,6 +22,7 @@
 import org.apache.catalina.tribes.tcp.nio.WorkerThread;
 import java.net.Socket;
 import java.io.InputStream;
+import org.apache.catalina.tribes.tcp.ReceiverBase;
 
 /**
  * A worker thread class which can drain channels and echo-back the input. Each
@@ -38,9 +39,9 @@
  * @version $Revision: 378050 $, $Date: 2006-02-15 12:30:02 -0600 (Wed, 15 Feb 
2006) $
  */
 public class TcpReplicationThread extends WorkerThread {
-    public static final int OPTION_SEND_ACK = 0x0001;
-    public static final int OPTION_SYNCHRONIZED = 0x0002;
-    public static final int OPTION_DIRECT_BUFFER = 0x0004;
+    public static final int OPTION_SEND_ACK = ReceiverBase.OPTION_SEND_ACK;
+    public static final int OPTION_SYNCHRONIZED = 
ReceiverBase.OPTION_SYNCHRONIZED;
+    public static final int OPTION_DIRECT_BUFFER = 
ReceiverBase.OPTION_DIRECT_BUFFER;
 
 
     protected static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory.getLog( TcpReplicationThread.class );

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java?rev=385849&r1=385848&r2=385849&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
 Tue Mar 14 10:41:41 2006
@@ -33,20 +33,13 @@
 import org.apache.catalina.tribes.io.ObjectReader;
 import org.apache.catalina.tribes.tcp.Constants;
 import org.apache.catalina.util.StringManager;
+import org.apache.catalina.tribes.tcp.ReceiverBase;
 
 /**
  * @author Filip Hanik
  * @version $Revision: 379904 $ $Date: 2006-02-22 15:16:25 -0600 (Wed, 22 Feb 
2006) $
  */
-public class NioReceiver implements Runnable, ChannelReceiver, ListenCallback {
-    /**
-     * @todo make this configurable
-     */
-    protected int rxBufSize = 43800;
-    /**
-     * We are only sending acks
-     */
-    protected int txBufSize = 25188;
+public class NioReceiver extends ReceiverBase implements Runnable, 
ChannelReceiver, ListenCallback {
 
     protected static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory.getLog(NioReceiver.class);
 
@@ -60,24 +53,12 @@
      */
     private static final String info = "NioReceiver/1.0";
 
-    private ThreadPool pool = null;
-    private int tcpThreadCount;
-    private long tcpSelectorTimeout;
     private Selector selector = null;
     private ServerSocketChannel serverChannel = null;
 
-    private java.net.InetAddress bind;
-    private String tcpListenAddress;
-    private int tcpListenPort;
-    private boolean sendAck;
-    protected boolean doListen = false;
-    
-
 
     private Object interestOpsMutex = new Object();
-    private MessageListener listener = null;
-    private boolean sync;
-    private boolean direct;
+
     public NioReceiver() {
     }
 
@@ -87,25 +68,7 @@
      * <code>&lt;description&gt;/&lt;version&gt;</code>.
      */
     public String getInfo() {
-
         return (info);
-
-    }
-
-    public long getTcpSelectorTimeout() {
-        return tcpSelectorTimeout;
-    }
-
-    public void setTcpSelectorTimeout(long tcpSelectorTimeout) {
-        this.tcpSelectorTimeout = tcpSelectorTimeout;
-    }
-
-    public int getTcpThreadCount() {
-        return tcpThreadCount;
-    }
-
-    public void setTcpThreadCount(int tcpThreadCount) {
-        this.tcpThreadCount = tcpThreadCount;
     }
 
     public Object getInterestOpsMutex() {
@@ -145,31 +108,7 @@
         }
     }
     
-    /**
-     * recursive bind to find the next available port
-     * @param socket ServerSocket
-     * @param portstart int
-     * @param retries int
-     * @return int
-     * @throws IOException
-     */
-    protected int bind(ServerSocket socket, int portstart, int retries) throws 
IOException {
-        while ( retries > 0 ) {
-            try {
-                InetSocketAddress addr = new InetSocketAddress(getBind(), 
portstart);
-                socket.bind(addr);
-                setTcpListenPort(portstart);
-                log.info("Nio Server Socket bound to:"+addr);
-                return 0;
-            }catch ( IOException x) {
-                retries--;
-                if ( retries <= 0 ) throw x;
-                portstart++;
-                retries = bind(socket,portstart,retries);
-            }
-        }
-        return retries;
-    }
+    
     
     protected void bind() throws IOException {
         // allocate an unbound server socket channel
@@ -195,7 +134,7 @@
      */
     protected void listen() throws Exception {
         if (doListen) {
-            log.warn("ServerSocketChannel allready started");
+            log.warn("ServerSocketChannel already started");
             return;
         }
         
@@ -206,7 +145,7 @@
             // selected set contains keys of the ready channels
             try {
 
-                int n = selector.select(tcpSelectorTimeout);
+                int n = selector.select(getTcpSelectorTimeout());
                 if (n == 0) {
                     //there is a good chance that we got here
                     //because the TcpReplicationThread called
@@ -339,139 +278,5 @@
         }
     }
 
-    public void messageDataReceived(ChannelMessage data) {
-        if ( this.listener != null ) {
-            listener.messageReceived(data);
-        }
-    }
-
-    /**
-     * @return Returns the bind.
-     */
-    public java.net.InetAddress getBind() {
-        if (bind == null) {
-            try {
-                if ("auto".equals(tcpListenAddress)) {
-                    tcpListenAddress = java.net.InetAddress.getLocalHost()
-                                       .getHostAddress();
-                }
-                if (log.isDebugEnabled())
-                    log.debug("Starting replication listener on address:"+ 
tcpListenAddress);
-                bind = java.net.InetAddress.getByName(tcpListenAddress);
-            } catch (IOException ioe) {
-                log.error("Failed bind replication listener on address:"+ 
tcpListenAddress, ioe);
-            }
-        }
-        return bind;
-    }
-
-    /**
-     * @param bind The bind to set.
-     */
-    public void setBind(java.net.InetAddress bind) {
-        this.bind = bind;
-    }
-
-    /**
-     * Send ACK to sender
-     *
-     * @return True if sending ACK
-     */
-    public boolean getSendAck() {
-        return sendAck;
-    }
-
-    /**
-     * set ack mode or not!
-     *
-     * @param sendAck
-     */
-    public void setSendAck(boolean sendAck) {
-        this.sendAck = sendAck;
-    }
-
-    public String getTcpListenAddress() {
-        return tcpListenAddress;
-    }
-
-    public void setTcpListenAddress(String tcpListenAddress) {
-        this.tcpListenAddress = tcpListenAddress;
-    }
-
-    public int getTcpListenPort() {
-        return tcpListenPort;
-    }
-
-    public boolean isSync() {
-        return sync;
-    }
-
-    public boolean getDirect() {
-        return direct;
-    }
-
-    public int getRxBufSize() {
-        return rxBufSize;
-    }
-
-    public int getTxBufSize() {
-        return txBufSize;
-    }
-
-    public MessageListener getMessageListener() {
-        return listener;
-    }
-
-    public void setTcpListenPort(int tcpListenPort) {
-        this.tcpListenPort = tcpListenPort;
-    }
-
-    public void setDirect(boolean direct) {
-        this.direct = direct;
-    }
-
-    public void setRxBufSize(int rxBufSize) {
-        this.rxBufSize = rxBufSize;
-    }
-
-    public void setTxBufSize(int txBufSize) {
-        this.txBufSize = txBufSize;
-    }
-
-    public void setSynchronized(boolean sync) {
-        this.sync = sync;
-    }
-
-    public boolean getSynchronized() {
-        return this.sync;
-    }
-    
-    public int getWorkerThreadOptions() {
-        int options = 0;
-        if ( getSynchronized() ) options = options 
|TcpReplicationThread.OPTION_SYNCHRONIZED;
-        if ( getSendAck() ) options = options 
|TcpReplicationThread.OPTION_SEND_ACK;
-        if ( getDirect() ) options = options | 
TcpReplicationThread.OPTION_DIRECT_BUFFER;
-        return options;
-    }
-
-    public void setMessageListener(MessageListener listener) {
-        this.listener = listener;
-    }
-
-    public String getHost() {
-        getBind();
-        return getTcpListenAddress();
-    }
-
-    public int getPort() {
-        return getTcpListenPort();
-    }
-
-    /* (non-Javadoc)
-     * @see org.apache.catalina.tribes.io.ListenCallback#sendAck()
-     */
-    public void sendAck() throws IOException {
-        // do nothing
-    }
 
 }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/TcpReplicationThread.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/TcpReplicationThread.java?rev=385849&r1=385848&r2=385849&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/TcpReplicationThread.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/TcpReplicationThread.java
 Tue Mar 14 10:41:41 2006
@@ -22,6 +22,7 @@
 
 import org.apache.catalina.tribes.io.ObjectReader;
 import org.apache.catalina.tribes.tcp.Constants;
+import org.apache.catalina.tribes.tcp.ReceiverBase;
 
 /**
  * A worker thread class which can drain channels and echo-back the input. Each
@@ -38,9 +39,10 @@
  * @version $Revision: 378050 $, $Date: 2006-02-15 12:30:02 -0600 (Wed, 15 Feb 
2006) $
  */
 public class TcpReplicationThread extends WorkerThread {
-    public static final int OPTION_SEND_ACK = 0x0001;
-    public static final int OPTION_SYNCHRONIZED = 0x0002;
-    public static final int OPTION_DIRECT_BUFFER = 0x0004;
+    public static final int OPTION_SEND_ACK = ReceiverBase.OPTION_SEND_ACK;
+    public static final int OPTION_SYNCHRONIZED = 
ReceiverBase.OPTION_SYNCHRONIZED;
+    public static final int OPTION_DIRECT_BUFFER = 
ReceiverBase.OPTION_DIRECT_BUFFER;
+
 
     
     private static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory.getLog( TcpReplicationThread.class );

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java?rev=385849&r1=385848&r2=385849&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java
 Tue Mar 14 10:41:41 2006
@@ -28,7 +28,9 @@
 import org.apache.catalina.tribes.tcp.MultiPointSender;
 import org.apache.catalina.tribes.tcp.ReplicationTransmitter;
 import org.apache.catalina.tribes.tcp.nio.NioReceiver;
+import org.apache.catalina.tribes.tcp.bio.BioReceiver;
 import org.apache.tomcat.util.IntrospectionUtils;
+import org.apache.catalina.tribes.tcp.ReceiverBase;
 
 /**
  * <p>Title: </p>
@@ -53,7 +55,8 @@
            .append("\n\t\t[-ackto acktimeout]") 
            .append("\n\t\t[-autoconnect true|false]")
            .append("\n\t\t[-sync true|false]")
-           .append("\n\t\t[-transport 
org.apache.catalina.tribes.tcp.nio.ParallelNioSender]")
+           .append("\n\t\t[-receiver 
org.apache.catalina.tribes.tcp.nio.NioReceiver|org.apache.catalina.tribes.tcp.bio.BioReceiver|]")
+           .append("\n\t\t[-transport 
org.apache.catalina.tribes.tcp.nio.PooledParallelSender|org.apache.catalina.tribes.tcp.bio.PooledMultipointSender]")
            .append("\n\t\t[-transport.xxx transport specific property]")
            .append("\n\t\t[-maddr multicastaddr]")
            .append("\n\t\t[-mport multicastport]")
@@ -90,6 +93,7 @@
         int fragsize = 1024;
         Properties transportProperties = new Properties();
         String transport = 
"org.apache.catalina.tribes.tcp.nio.PooledParallelSender";
+        String receiver = "org.apache.catalina.tribes.tcp.nio.NioReceiver";
         
         for (int i = 0; i < args.length; i++) {
             if ("-bind".equals(args[i])) {
@@ -126,6 +130,8 @@
                 String key = args[i];
                 String val = args[++i];
                 transportProperties.setProperty(key,val);
+            } else if ("-receiver".equals(args[i])) {
+                receiver = args[++i];
             } else if ("-maddr".equals(args[i])) {
                 mcastaddr = args[++i];
             } else if ("-mport".equals(args[i])) {
@@ -138,15 +144,19 @@
                 mbind = args[++i];
             }
         }
-
-        NioReceiver rl = new NioReceiver();
-        rl.setTcpListenAddress(bind);
-        rl.setTcpListenPort(port);
-        rl.setTcpSelectorTimeout(tcpseltimeout);
-        rl.setTcpThreadCount(tcpthreadcount);
-        rl.getBind();
-        rl.setSendAck(ack);
-        rl.setSynchronized(sync);
+        
+        System.out.println("Creating receiver class="+receiver);
+        Class cl = 
Class.forName(receiver,true,ChannelCreator.class.getClassLoader());
+        ReceiverBase rx = (ReceiverBase)cl.newInstance();
+        rx.setTcpListenAddress(bind);
+        rx.setTcpListenPort(port);
+        rx.setTcpSelectorTimeout(tcpseltimeout);
+        rx.setTcpThreadCount(tcpthreadcount);
+        rx.getBind();
+        rx.setSendAck(ack);
+        rx.setSynchronized(sync);
+        rx.setRxBufSize(43800);
+        rx.setTxBufSize(25188);
 
         
         ReplicationTransmitter ps = new ReplicationTransmitter();
@@ -174,7 +184,7 @@
         service.setMcastPort(mcastport);
 
         ManagedChannel channel = new GroupChannel();
-        channel.setChannelReceiver(rl);
+        channel.setChannelReceiver(rx);
         channel.setChannelSender(ps);
         channel.setMembershipService(service);
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to