Author: fhanik
Date: Tue Mar 14 10:41:41 2006
New Revision: 385849
URL: http://svn.apache.org/viewcvs?rev=385849&view=rev
Log:
Refactored receivers both NIO and blocking IO into a receiver base
Added:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/TcpReplicationThread.java
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java
Added:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java?rev=385849&view=auto
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
(added)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
Tue Mar 14 10:41:41 2006
@@ -0,0 +1,263 @@
+/*
+ * Copyright 1999,2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.catalina.tribes.tcp;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.ChannelReceiver;
+import org.apache.catalina.tribes.MessageListener;
+import org.apache.catalina.tribes.io.ListenCallback;
+import org.apache.catalina.tribes.tcp.nio.ThreadPool;
+
+/**
+ * <p>Title: </p>
+ *
+ * <p>Description: </p>
+ *
+ * <p>Copyright: Copyright (c) 2005</p>
+ *
+ * <p>Company: </p>
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public abstract class ReceiverBase implements ChannelReceiver, ListenCallback {
+
+ public static final int OPTION_SEND_ACK = 0x0001;
+ public static final int OPTION_SYNCHRONIZED = 0x0002;
+ public static final int OPTION_DIRECT_BUFFER = 0x0004;
+
+
+ protected static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory.getLog(ReceiverBase.class);
+
+ protected MessageListener listener;
+ protected String host;
+ protected InetAddress bind;
+ protected int port;
+ protected boolean sendack;
+ protected boolean sync;
+ protected int rxBufSize = 43800;
+ protected int txBufSize = 25188;
+ protected int tcpThreadCount;
+ protected boolean doListen = false;
+ protected ThreadPool pool;
+ protected boolean direct = true;
+ protected long tcpSelectorTimeout;
+
+ public ReceiverBase() {
+ }
+
+ /**
+ * getMessageListener
+ *
+ * @return MessageListener
+ * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
+ */
+ public MessageListener getMessageListener() {
+ return listener;
+ }
+
+ /**
+ *
+ * @return The port
+ * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
+ */
+ public int getPort() {
+ return port;
+ }
+
+ public int getRxBufSize() {
+ return rxBufSize;
+ }
+
+ public int getTxBufSize() {
+ return txBufSize;
+ }
+
+ public int getTcpThreadCount() {
+ return tcpThreadCount;
+ }
+
+ /**
+ *
+ * @return boolean
+ * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
+ */
+ public boolean getSendAck() {
+ return sendack;
+ }
+
+ /**
+ * setMessageListener
+ *
+ * @param listener MessageListener
+ * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
+ */
+ public void setMessageListener(MessageListener listener) {
+ this.listener = listener;
+ }
+
+
+ /**
+ *
+ * @param isSendAck boolean
+ * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
+ */
+ public void setSendAck(boolean sendAck) {
+ this.sendack = sendAck;
+ }
+ public void setTcpListenPort(int tcpListenPort) {
+ this.port = tcpListenPort;
+ }
+
+ public void setTcpListenAddress(String tcpListenHost) {
+ this.host = tcpListenHost;
+ }
+
+ public void setSynchronized(boolean sync) {
+ this.sync = sync;
+ }
+
+ public void setRxBufSize(int rxBufSize) {
+ this.rxBufSize = rxBufSize;
+ }
+
+ public void setTxBufSize(int txBufSize) {
+ this.txBufSize = txBufSize;
+ }
+
+ public void setTcpThreadCount(int tcpThreadCount) {
+ this.tcpThreadCount = tcpThreadCount;
+ }
+
+ /**
+ * @return Returns the bind.
+ */
+ public InetAddress getBind() {
+ if (bind == null) {
+ try {
+ if ("auto".equals(host)) {
+ host =
java.net.InetAddress.getLocalHost().getHostAddress();
+ }
+ if (log.isDebugEnabled())
+ log.debug("Starting replication listener on address:"+
host);
+ bind = java.net.InetAddress.getByName(host);
+ } catch (IOException ioe) {
+ log.error("Failed bind replication listener on address:"+
host, ioe);
+ }
+ }
+ return bind;
+ }
+
+ /**
+ * recursive bind to find the next available port
+ * @param socket ServerSocket
+ * @param portstart int
+ * @param retries int
+ * @return int
+ * @throws IOException
+ */
+ protected int bind(ServerSocket socket, int portstart, int retries) throws
IOException {
+ while ( retries > 0 ) {
+ try {
+ InetSocketAddress addr = new InetSocketAddress(getBind(),
portstart);
+ socket.bind(addr);
+ setTcpListenPort(portstart);
+ log.info("Nio Server Socket bound to:"+addr);
+ return 0;
+ }catch ( IOException x) {
+ retries--;
+ if ( retries <= 0 ) throw x;
+ portstart++;
+ retries = bind(socket,portstart,retries);
+ }
+ }
+ return retries;
+ }
+
+ public void messageDataReceived(ChannelMessage data) {
+ if ( this.listener != null ) {
+ listener.messageReceived(data);
+ }
+ }
+
+ public int getWorkerThreadOptions() {
+ int options = 0;
+ if ( getSynchronized() ) options = options |OPTION_SYNCHRONIZED;
+ if ( getSendAck() ) options = options |OPTION_SEND_ACK;
+ if ( getDirect() ) options = options | OPTION_DIRECT_BUFFER;
+ return options;
+ }
+
+
+ /**
+ * @param bind The bind to set.
+ */
+ public void setBind(java.net.InetAddress bind) {
+ this.bind = bind;
+ }
+
+
+ public int getTcpListenPort() {
+ return this.port;
+ }
+
+ public boolean isSync() {
+ return sync;
+ }
+
+ public boolean getDirect() {
+ return direct;
+ }
+
+
+
+ public void setDirect(boolean direct) {
+ this.direct = direct;
+ }
+
+
+ public boolean getSynchronized() {
+ return this.sync;
+ }
+
+
+
+ public String getHost() {
+ getBind();
+ return this.host;
+ }
+
+ public long getTcpSelectorTimeout() {
+ return tcpSelectorTimeout;
+ }
+
+ public void setTcpSelectorTimeout(long selTimeout) {
+ tcpSelectorTimeout = selTimeout;
+ }
+ /* (non-Javadoc)
+ * @see org.apache.catalina.tribes.io.ListenCallback#sendAck()
+ */
+ public void sendAck() throws IOException {
+ // do nothing
+ }
+
+
+}
\ No newline at end of file
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java?rev=385849&r1=385848&r2=385849&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
Tue Mar 14 10:41:41 2006
@@ -27,6 +27,7 @@
import org.apache.catalina.tribes.ChannelMessage;
import java.net.Socket;
import org.apache.catalina.tribes.io.ObjectReader;
+import org.apache.catalina.tribes.tcp.ReceiverBase;
/**
* <p>Title: </p>
@@ -40,104 +41,21 @@
* @author not attributable
* @version 1.0
*/
-public class BioReceiver implements Runnable, ChannelReceiver, ListenCallback {
+public class BioReceiver extends ReceiverBase implements Runnable,
ChannelReceiver, ListenCallback {
protected static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory.getLog(BioReceiver.class);
-
- protected MessageListener listener;
- protected String host;
- protected InetAddress bind;
- protected int port;
- protected boolean sendack;
- protected boolean sync;
- protected int rxBufSize = 43800;
- protected int txBufSize = 25188;
- protected int tcpThreadCount;
protected ServerSocket serverSocket;
- protected boolean doRun = true;
-
- protected ThreadPool pool;
public BioReceiver() {
}
/**
*
- * @return The host
- * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
- */
- public String getHost() {
- return host;
- }
-
- /**
- * getMessageListener
- *
- * @return MessageListener
- * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
- */
- public MessageListener getMessageListener() {
- return listener;
- }
-
- /**
- *
- * @return The port
- * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
- */
- public int getPort() {
- return port;
- }
-
- public int getRxBufSize() {
- return rxBufSize;
- }
-
- public int getTxBufSize() {
- return txBufSize;
- }
-
- public int getTcpThreadCount() {
- return tcpThreadCount;
- }
-
- /**
- *
- * @return boolean
- * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
- */
- public boolean getSendAck() {
- return sendack;
- }
-
- /**
- * setMessageListener
- *
- * @param listener MessageListener
- * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
- */
- public void setMessageListener(MessageListener listener) {
- this.listener = listener;
- }
-
-
- /**
- *
- * @param isSendAck boolean
- * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
- */
- public void setSendAck(boolean sendAck) {
- this.sendack = sendAck;
- }
-
- /**
- *
* @throws IOException
* @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
*/
public void start() throws IOException {
- this.doRun = true;
try {
TcpReplicationThread[] receivers = new
TcpReplicationThread[tcpThreadCount];
for ( int i=0; i<receivers.length; i++ ) {
@@ -170,78 +88,14 @@
* @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
*/
public void stop() {
- this.doRun = false;
+ this.doListen = false;
try {
this.serverSocket.close();
}catch ( Exception x ) {}
}
- public void setTcpListenPort(int tcpListenPort) {
- this.port = tcpListenPort;
- }
-
- public void setTcpListenAddress(String tcpListenHost) {
- this.host = tcpListenHost;
- }
-
- public void setSynchronized(boolean sync) {
- this.sync = sync;
- }
-
- public void setRxBufSize(int rxBufSize) {
- this.rxBufSize = rxBufSize;
- }
-
- public void setTxBufSize(int txBufSize) {
- this.txBufSize = txBufSize;
- }
-
- public void setTcpThreadCount(int tcpThreadCount) {
- this.tcpThreadCount = tcpThreadCount;
- }
-
- public void setTcpSelectorTimeout(long timeout) {
- //do nothing
- }
- /**
- * @return Returns the bind.
- */
- public InetAddress getBind() {
- if (bind == null) {
- try {
- if ("auto".equals(host)) {
- host =
java.net.InetAddress.getLocalHost().getHostAddress();
- }
- if (log.isDebugEnabled())
- log.debug("Starting replication listener on address:"+
host);
- bind = java.net.InetAddress.getByName(host);
- } catch (IOException ioe) {
- log.error("Failed bind replication listener on address:"+
host, ioe);
- }
- }
- return bind;
- }
- protected int bind(ServerSocket socket, int portstart, int retries) throws
IOException {
- while ( retries > 0 ) {
- try {
- InetSocketAddress addr = new InetSocketAddress(getBind(),
portstart);
- socket.bind(addr);
- setTcpListenPort(portstart);
- log.info("Bio Server Socket bound to:"+addr);
- return 0;
- }catch ( IOException x) {
- retries--;
- if ( retries <= 0 ) throw x;
- portstart++;
- retries = bind(socket,portstart,retries);
- }
- }
- return retries;
- }
-
-
protected void bind() throws IOException {
// allocate an unbound server socket channel
@@ -251,11 +105,7 @@
bind(serverSocket,getPort(),10);
}
- public void messageDataReceived(ChannelMessage data) {
- if ( this.listener != null ) {
- listener.messageReceived(data);
- }
- }
+
public void run() {
try {
@@ -267,14 +117,20 @@
}
public void listen() throws Exception {
- while ( doRun ) {
+ if (doListen) {
+ log.warn("ServerSocket already started");
+ return;
+ }
+ doListen = true;
+
+ while ( doListen ) {
Socket socket = null;
try {
socket = serverSocket.accept();
}catch ( Exception x ) {
- if ( doRun ) throw x;
+ if ( doListen ) throw x;
}
- if ( !doRun ) break;
+ if ( !doListen ) break; //regular shutdown
if ( socket == null ) continue;
socket.setReceiveBufferSize(rxBufSize);
socket.setSendBufferSize(txBufSize);
@@ -293,14 +149,5 @@
}//while
}
- public int getWorkerThreadOptions() {
- int options = 0;
- if ( sync ) options = options
|TcpReplicationThread.OPTION_SYNCHRONIZED;
- if ( getSendAck() ) options = options
|TcpReplicationThread.OPTION_SEND_ACK;
- return options;
- }
-
-
-
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java?rev=385849&r1=385848&r2=385849&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java
Tue Mar 14 10:41:41 2006
@@ -22,6 +22,7 @@
import org.apache.catalina.tribes.tcp.nio.WorkerThread;
import java.net.Socket;
import java.io.InputStream;
+import org.apache.catalina.tribes.tcp.ReceiverBase;
/**
* A worker thread class which can drain channels and echo-back the input. Each
@@ -38,9 +39,9 @@
* @version $Revision: 378050 $, $Date: 2006-02-15 12:30:02 -0600 (Wed, 15 Feb
2006) $
*/
public class TcpReplicationThread extends WorkerThread {
- public static final int OPTION_SEND_ACK = 0x0001;
- public static final int OPTION_SYNCHRONIZED = 0x0002;
- public static final int OPTION_DIRECT_BUFFER = 0x0004;
+ public static final int OPTION_SEND_ACK = ReceiverBase.OPTION_SEND_ACK;
+ public static final int OPTION_SYNCHRONIZED =
ReceiverBase.OPTION_SYNCHRONIZED;
+ public static final int OPTION_DIRECT_BUFFER =
ReceiverBase.OPTION_DIRECT_BUFFER;
protected static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory.getLog( TcpReplicationThread.class );
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java?rev=385849&r1=385848&r2=385849&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
Tue Mar 14 10:41:41 2006
@@ -33,20 +33,13 @@
import org.apache.catalina.tribes.io.ObjectReader;
import org.apache.catalina.tribes.tcp.Constants;
import org.apache.catalina.util.StringManager;
+import org.apache.catalina.tribes.tcp.ReceiverBase;
/**
* @author Filip Hanik
* @version $Revision: 379904 $ $Date: 2006-02-22 15:16:25 -0600 (Wed, 22 Feb
2006) $
*/
-public class NioReceiver implements Runnable, ChannelReceiver, ListenCallback {
- /**
- * @todo make this configurable
- */
- protected int rxBufSize = 43800;
- /**
- * We are only sending acks
- */
- protected int txBufSize = 25188;
+public class NioReceiver extends ReceiverBase implements Runnable,
ChannelReceiver, ListenCallback {
protected static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory.getLog(NioReceiver.class);
@@ -60,24 +53,12 @@
*/
private static final String info = "NioReceiver/1.0";
- private ThreadPool pool = null;
- private int tcpThreadCount;
- private long tcpSelectorTimeout;
private Selector selector = null;
private ServerSocketChannel serverChannel = null;
- private java.net.InetAddress bind;
- private String tcpListenAddress;
- private int tcpListenPort;
- private boolean sendAck;
- protected boolean doListen = false;
-
-
private Object interestOpsMutex = new Object();
- private MessageListener listener = null;
- private boolean sync;
- private boolean direct;
+
public NioReceiver() {
}
@@ -87,25 +68,7 @@
* <code><description>/<version></code>.
*/
public String getInfo() {
-
return (info);
-
- }
-
- public long getTcpSelectorTimeout() {
- return tcpSelectorTimeout;
- }
-
- public void setTcpSelectorTimeout(long tcpSelectorTimeout) {
- this.tcpSelectorTimeout = tcpSelectorTimeout;
- }
-
- public int getTcpThreadCount() {
- return tcpThreadCount;
- }
-
- public void setTcpThreadCount(int tcpThreadCount) {
- this.tcpThreadCount = tcpThreadCount;
}
public Object getInterestOpsMutex() {
@@ -145,31 +108,7 @@
}
}
- /**
- * recursive bind to find the next available port
- * @param socket ServerSocket
- * @param portstart int
- * @param retries int
- * @return int
- * @throws IOException
- */
- protected int bind(ServerSocket socket, int portstart, int retries) throws
IOException {
- while ( retries > 0 ) {
- try {
- InetSocketAddress addr = new InetSocketAddress(getBind(),
portstart);
- socket.bind(addr);
- setTcpListenPort(portstart);
- log.info("Nio Server Socket bound to:"+addr);
- return 0;
- }catch ( IOException x) {
- retries--;
- if ( retries <= 0 ) throw x;
- portstart++;
- retries = bind(socket,portstart,retries);
- }
- }
- return retries;
- }
+
protected void bind() throws IOException {
// allocate an unbound server socket channel
@@ -195,7 +134,7 @@
*/
protected void listen() throws Exception {
if (doListen) {
- log.warn("ServerSocketChannel allready started");
+ log.warn("ServerSocketChannel already started");
return;
}
@@ -206,7 +145,7 @@
// selected set contains keys of the ready channels
try {
- int n = selector.select(tcpSelectorTimeout);
+ int n = selector.select(getTcpSelectorTimeout());
if (n == 0) {
//there is a good chance that we got here
//because the TcpReplicationThread called
@@ -339,139 +278,5 @@
}
}
- public void messageDataReceived(ChannelMessage data) {
- if ( this.listener != null ) {
- listener.messageReceived(data);
- }
- }
-
- /**
- * @return Returns the bind.
- */
- public java.net.InetAddress getBind() {
- if (bind == null) {
- try {
- if ("auto".equals(tcpListenAddress)) {
- tcpListenAddress = java.net.InetAddress.getLocalHost()
- .getHostAddress();
- }
- if (log.isDebugEnabled())
- log.debug("Starting replication listener on address:"+
tcpListenAddress);
- bind = java.net.InetAddress.getByName(tcpListenAddress);
- } catch (IOException ioe) {
- log.error("Failed bind replication listener on address:"+
tcpListenAddress, ioe);
- }
- }
- return bind;
- }
-
- /**
- * @param bind The bind to set.
- */
- public void setBind(java.net.InetAddress bind) {
- this.bind = bind;
- }
-
- /**
- * Send ACK to sender
- *
- * @return True if sending ACK
- */
- public boolean getSendAck() {
- return sendAck;
- }
-
- /**
- * set ack mode or not!
- *
- * @param sendAck
- */
- public void setSendAck(boolean sendAck) {
- this.sendAck = sendAck;
- }
-
- public String getTcpListenAddress() {
- return tcpListenAddress;
- }
-
- public void setTcpListenAddress(String tcpListenAddress) {
- this.tcpListenAddress = tcpListenAddress;
- }
-
- public int getTcpListenPort() {
- return tcpListenPort;
- }
-
- public boolean isSync() {
- return sync;
- }
-
- public boolean getDirect() {
- return direct;
- }
-
- public int getRxBufSize() {
- return rxBufSize;
- }
-
- public int getTxBufSize() {
- return txBufSize;
- }
-
- public MessageListener getMessageListener() {
- return listener;
- }
-
- public void setTcpListenPort(int tcpListenPort) {
- this.tcpListenPort = tcpListenPort;
- }
-
- public void setDirect(boolean direct) {
- this.direct = direct;
- }
-
- public void setRxBufSize(int rxBufSize) {
- this.rxBufSize = rxBufSize;
- }
-
- public void setTxBufSize(int txBufSize) {
- this.txBufSize = txBufSize;
- }
-
- public void setSynchronized(boolean sync) {
- this.sync = sync;
- }
-
- public boolean getSynchronized() {
- return this.sync;
- }
-
- public int getWorkerThreadOptions() {
- int options = 0;
- if ( getSynchronized() ) options = options
|TcpReplicationThread.OPTION_SYNCHRONIZED;
- if ( getSendAck() ) options = options
|TcpReplicationThread.OPTION_SEND_ACK;
- if ( getDirect() ) options = options |
TcpReplicationThread.OPTION_DIRECT_BUFFER;
- return options;
- }
-
- public void setMessageListener(MessageListener listener) {
- this.listener = listener;
- }
-
- public String getHost() {
- getBind();
- return getTcpListenAddress();
- }
-
- public int getPort() {
- return getTcpListenPort();
- }
-
- /* (non-Javadoc)
- * @see org.apache.catalina.tribes.io.ListenCallback#sendAck()
- */
- public void sendAck() throws IOException {
- // do nothing
- }
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/TcpReplicationThread.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/TcpReplicationThread.java?rev=385849&r1=385848&r2=385849&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/TcpReplicationThread.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/TcpReplicationThread.java
Tue Mar 14 10:41:41 2006
@@ -22,6 +22,7 @@
import org.apache.catalina.tribes.io.ObjectReader;
import org.apache.catalina.tribes.tcp.Constants;
+import org.apache.catalina.tribes.tcp.ReceiverBase;
/**
* A worker thread class which can drain channels and echo-back the input. Each
@@ -38,9 +39,10 @@
* @version $Revision: 378050 $, $Date: 2006-02-15 12:30:02 -0600 (Wed, 15 Feb
2006) $
*/
public class TcpReplicationThread extends WorkerThread {
- public static final int OPTION_SEND_ACK = 0x0001;
- public static final int OPTION_SYNCHRONIZED = 0x0002;
- public static final int OPTION_DIRECT_BUFFER = 0x0004;
+ public static final int OPTION_SEND_ACK = ReceiverBase.OPTION_SEND_ACK;
+ public static final int OPTION_SYNCHRONIZED =
ReceiverBase.OPTION_SYNCHRONIZED;
+ public static final int OPTION_DIRECT_BUFFER =
ReceiverBase.OPTION_DIRECT_BUFFER;
+
private static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory.getLog( TcpReplicationThread.class );
Modified:
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java?rev=385849&r1=385848&r2=385849&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java
Tue Mar 14 10:41:41 2006
@@ -28,7 +28,9 @@
import org.apache.catalina.tribes.tcp.MultiPointSender;
import org.apache.catalina.tribes.tcp.ReplicationTransmitter;
import org.apache.catalina.tribes.tcp.nio.NioReceiver;
+import org.apache.catalina.tribes.tcp.bio.BioReceiver;
import org.apache.tomcat.util.IntrospectionUtils;
+import org.apache.catalina.tribes.tcp.ReceiverBase;
/**
* <p>Title: </p>
@@ -53,7 +55,8 @@
.append("\n\t\t[-ackto acktimeout]")
.append("\n\t\t[-autoconnect true|false]")
.append("\n\t\t[-sync true|false]")
- .append("\n\t\t[-transport
org.apache.catalina.tribes.tcp.nio.ParallelNioSender]")
+ .append("\n\t\t[-receiver
org.apache.catalina.tribes.tcp.nio.NioReceiver|org.apache.catalina.tribes.tcp.bio.BioReceiver|]")
+ .append("\n\t\t[-transport
org.apache.catalina.tribes.tcp.nio.PooledParallelSender|org.apache.catalina.tribes.tcp.bio.PooledMultipointSender]")
.append("\n\t\t[-transport.xxx transport specific property]")
.append("\n\t\t[-maddr multicastaddr]")
.append("\n\t\t[-mport multicastport]")
@@ -90,6 +93,7 @@
int fragsize = 1024;
Properties transportProperties = new Properties();
String transport =
"org.apache.catalina.tribes.tcp.nio.PooledParallelSender";
+ String receiver = "org.apache.catalina.tribes.tcp.nio.NioReceiver";
for (int i = 0; i < args.length; i++) {
if ("-bind".equals(args[i])) {
@@ -126,6 +130,8 @@
String key = args[i];
String val = args[++i];
transportProperties.setProperty(key,val);
+ } else if ("-receiver".equals(args[i])) {
+ receiver = args[++i];
} else if ("-maddr".equals(args[i])) {
mcastaddr = args[++i];
} else if ("-mport".equals(args[i])) {
@@ -138,15 +144,19 @@
mbind = args[++i];
}
}
-
- NioReceiver rl = new NioReceiver();
- rl.setTcpListenAddress(bind);
- rl.setTcpListenPort(port);
- rl.setTcpSelectorTimeout(tcpseltimeout);
- rl.setTcpThreadCount(tcpthreadcount);
- rl.getBind();
- rl.setSendAck(ack);
- rl.setSynchronized(sync);
+
+ System.out.println("Creating receiver class="+receiver);
+ Class cl =
Class.forName(receiver,true,ChannelCreator.class.getClassLoader());
+ ReceiverBase rx = (ReceiverBase)cl.newInstance();
+ rx.setTcpListenAddress(bind);
+ rx.setTcpListenPort(port);
+ rx.setTcpSelectorTimeout(tcpseltimeout);
+ rx.setTcpThreadCount(tcpthreadcount);
+ rx.getBind();
+ rx.setSendAck(ack);
+ rx.setSynchronized(sync);
+ rx.setRxBufSize(43800);
+ rx.setTxBufSize(25188);
ReplicationTransmitter ps = new ReplicationTransmitter();
@@ -174,7 +184,7 @@
service.setMcastPort(mcastport);
ManagedChannel channel = new GroupChannel();
- channel.setChannelReceiver(rl);
+ channel.setChannelReceiver(rx);
channel.setChannelSender(ps);
channel.setMembershipService(service);
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]