Author: fhanik
Date: Thu Mar 2 12:53:55 2006
New Revision: 382509
URL: http://svn.apache.org/viewcvs?rev=382509&view=rev
Log:
more refactoring
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java?rev=382509&r1=382508&r2=382509&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
Thu Mar 2 12:53:55 2006
@@ -32,5 +32,8 @@
public interface DataSender {
public void connect() throws ChannelException;
public void disconnect();
+ public boolean isConnected();
+ public void setRxBufSize(int size);
+ public void setTxBufSize(int size);
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java?rev=382509&r1=382508&r2=382509&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java
Thu Mar 2 12:53:55 2006
@@ -22,6 +22,9 @@
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.tcp.PooledSender;
+import org.apache.catalina.tribes.tcp.DataSender;
+import org.apache.catalina.tribes.tcp.SenderState;
/**
* Send cluster messages with a pool of sockets (25).
@@ -31,7 +34,7 @@
* @version 1.2
*/
-public class PooledSocketSender extends SinglePointDataSender {
+public class PooledSocketSender extends PooledSender {
private static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory
.getLog(org.apache.catalina.tribes.tcp.bio.PooledSocketSender.class);
@@ -44,9 +47,16 @@
// ----------------------------------------------------- Instance Variables
private int maxPoolSocketLimit = 25;
+ private String domain;
+ private InetAddress host;
+ private int port;
+ private SenderState senderState;
+ private int keepAliveMaxRequestCount;
+ private long keepAliveTimeout;
+ private long ackTimeout;
+ private boolean resend;
+ private boolean waitForAck;
- private SenderQueue senderQueue = null;
-
// ----------------------------------------------------- Constructor
/**
@@ -54,9 +64,12 @@
* @param host replication node tcp address
* @param port replication node tcp port
*/
- public PooledSocketSender(String domain,InetAddress host, int port) {
- super(domain,host, port);
- senderQueue = new SenderQueue(this, maxPoolSocketLimit);
+ public PooledSocketSender(String domain,InetAddress host, int port, int
poolSize) {
+ super(poolSize);
+ this.host = host;
+ this.domain = domain;
+ this.port = port;
+ this.maxPoolSocketLimit = poolSize;
}
// ----------------------------------------------------- Public Properties
@@ -72,36 +85,82 @@
}
- public void setMaxPoolSocketLimit(int limit) {
- maxPoolSocketLimit = limit;
- senderQueue.setLimit(limit);
+ public void setDomain(String domain) {
+ this.domain = domain;
}
- public int getMaxPoolSocketLimit() {
- return maxPoolSocketLimit;
+ public void setHost(InetAddress host) {
+ this.host = host;
}
- public int getInPoolSize() {
- return senderQueue.getInPoolSize();
+ public void setPort(int port) {
+ this.port = port;
}
- public int getInUsePoolSize() {
- return senderQueue.getInUsePoolSize();
+ public void setSenderState(SenderState senderState) {
+ this.senderState = senderState;
}
- // ----------------------------------------------------- Public Methode
+ public void setKeepAliveMaxRequestCount(int keepAliveMaxRequestCount) {
+ this.keepAliveMaxRequestCount = keepAliveMaxRequestCount;
+ }
+
+ public void setKeepAliveTimeout(long keepAliveTimeout) {
+ this.keepAliveTimeout = keepAliveTimeout;
+ }
+
+ public void setAckTimeout(long ackTimeout) {
+ this.ackTimeout = ackTimeout;
+ }
+
+ public void setResend(boolean resend) {
+ this.resend = resend;
+ }
+
+ public void setWaitForAck(boolean waitForAck) {
+ this.waitForAck = waitForAck;
+ }
+
+ public String getDomain() {
+ return domain;
+ }
+
+ public InetAddress getHost() {
+ return host;
+ }
- public synchronized void connect() throws ChannelException {
- //do nothing, happens in the socket sender itself
- senderQueue.open();
- setSocketConnected(true);
+ public int getPort() {
+ return port;
}
- public synchronized void disconnect() {
- senderQueue.close();
- setSocketConnected(false);
+ public SenderState getSenderState() {
+ return senderState;
}
+ public int getKeepAliveMaxRequestCount() {
+ return keepAliveMaxRequestCount;
+ }
+
+ public long getKeepAliveTimeout() {
+ return keepAliveTimeout;
+ }
+
+ public long getAckTimeout() {
+ return ackTimeout;
+ }
+
+ public boolean isResend() {
+ return resend;
+ }
+
+ public boolean getWaitForAck() {
+ return waitForAck;
+ }
+
+ // ----------------------------------------------------- Public Methode
+
+
+
/**
* send message and use a pool of DataSenders
*
@@ -117,9 +176,9 @@
connect();
}
}
- SinglePointDataSender sender = senderQueue.getSender(0);
+ SinglePointDataSender sender = (SinglePointDataSender)getSender();
if (sender == null) {
- log.warn(sm.getString("PoolSocketSender.noMoreSender",
this.getAddress(), new Integer(this.getPort())));
+ log.warn("Sender queue is empty. Can not send any messages.");
return;
}
//send the message
@@ -127,141 +186,30 @@
sender.sendMessage(data);
} finally {
//return the connection to the pool
- senderQueue.returnSender(sender);
+ returnSender(sender);
}
}
public String toString() {
StringBuffer buf = new StringBuffer("PooledSocketSender[");
- buf.append(getAddress()).append(":").append(getPort()).append("]");
+ buf.append(getHost()).append(":").append(getPort()).append("]");
return buf.toString();
}
- // ----------------------------------------------------- Inner Class
-
- private class SenderQueue {
- private int limit = 25;
-
- PooledSocketSender parent = null;
-
- private LinkedList queue = new LinkedList();
-
- private LinkedList inuse = new LinkedList();
-
- private Object mutex = new Object();
-
- private boolean isOpen = true;
-
- public SenderQueue(PooledSocketSender parent, int limit) {
- this.limit = limit;
- this.parent = parent;
- }
-
- /**
- * @return Returns the limit.
- */
- public int getLimit() {
- return limit;
- }
- /**
- * @param limit The limit to set.
- */
- public void setLimit(int limit) {
- this.limit = limit;
- }
- /**
- * @return
- */
- public int getInUsePoolSize() {
- return inuse.size();
- }
-
- /**
- * @return
- */
- public int getInPoolSize() {
- return queue.size();
- }
-
- public SinglePointDataSender getSender(long timeout) {
- SinglePointDataSender sender = null;
- long start = System.currentTimeMillis();
- long delta = 0;
- do {
- synchronized (mutex) {
- if (!isOpen)
- throw new IllegalStateException(
- "Socket pool is closed.");
- if (queue.size() > 0) {
- sender = (SinglePointDataSender) queue.removeFirst();
- } else if (inuse.size() < limit) {
- sender = getNewDataSender();
- } else {
- try {
- mutex.wait(timeout);
- } catch (Exception x) {
-
PooledSocketSender.log.warn(sm.getString("PoolSocketSender.senderQueue.sender.failed",parent.getAddress(),new
Integer(parent.getPort())),x);
- }//catch
- }//end if
- if (sender != null) {
- inuse.add(sender);
- }
- }//synchronized
- delta = System.currentTimeMillis() - start;
- } while ((isOpen) && (sender == null)
- && (timeout == 0 ? true : (delta < timeout)));
- //to do
- return sender;
- }
-
- public void returnSender(SinglePointDataSender sender) {
- //to do
- synchronized (mutex) {
- queue.add(sender);
- inuse.remove(sender);
- mutex.notify();
- }
- }
-
- private SinglePointDataSender getNewDataSender() {
- //new DataSender(
+ public DataSender getNewDataSender() {
+ //new DataSender(
SinglePointDataSender sender = new
SinglePointDataSender(getDomain(),
- parent.getAddress(),
- parent.getPort(),
- parent.getSenderState() );
-
sender.setKeepAliveMaxRequestCount(parent.getKeepAliveMaxRequestCount());
- sender.setKeepAliveTimeout(parent.getKeepAliveTimeout());
- sender.setAckTimeout(parent.getAckTimeout());
- sender.setWaitForAck(parent.getWaitForAck());
- sender.setResend(parent.isResend());
- sender.setRxBufSize(parent.getRxBufSize());
- sender.setTxBufSize(parent.getTxBufSize());
+ getHost(),
+ getPort(),
+ getSenderState() );
+ sender.setKeepAliveMaxRequestCount(getKeepAliveMaxRequestCount());
+ sender.setKeepAliveTimeout(getKeepAliveTimeout());
+ sender.setAckTimeout(getAckTimeout());
+ sender.setWaitForAck(getWaitForAck());
+ sender.setResend(isResend());
+ sender.setRxBufSize(getRxBufSize());
+ sender.setTxBufSize(getTxBufSize());
return sender;
- }
-
- public void close() {
- synchronized (mutex) {
- for (int i = 0; i < queue.size(); i++) {
- SinglePointDataSender sender = (SinglePointDataSender)
queue.get(i);
- sender.disconnect();
- }//for
- for (int i = 0; i < inuse.size(); i++) {
- SinglePointDataSender sender = (SinglePointDataSender)
inuse.get(i);
- sender.disconnect();
- }//for
- queue.clear();
- inuse.clear();
- isOpen = false;
- mutex.notifyAll();
- }
- }
-
- public void open() {
- synchronized (mutex) {
- isOpen = true;
- mutex.notifyAll();
- }
- }
}
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java?rev=382509&r1=382508&r2=382509&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
Thu Mar 2 12:53:55 2006
@@ -17,16 +17,16 @@
import java.io.IOException;
+import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.HashMap;
+import java.util.Iterator;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.io.ClusterData;
import org.apache.catalina.tribes.io.XByteBuffer;
-import java.util.Iterator;
-import java.nio.channels.SelectionKey;
import org.apache.catalina.tribes.tcp.MultiPointSender;
/**
@@ -53,7 +53,8 @@
protected int rxBufSize = 43800;
protected int txBufSize = 25188;
protected boolean suspect = false;
-
+ private boolean connected;
+
public ParallelNioSender(long timeout,
boolean waitForAck,
int retryAttempts,
@@ -189,6 +190,7 @@
public void connect() {
//do nothing, we connect on demand
+ setConnected(true);
}
@@ -211,6 +213,7 @@
public synchronized void disconnect() {
try {close(); }catch (Exception x){}
+ setConnected(false);
}
public void finalize() {
@@ -220,6 +223,11 @@
public boolean getSuspect() {
return suspect;
}
+
+ public boolean isConnected() {
+ return connected;
+ }
+
public void setSuspect(boolean suspect) {
this.suspect = suspect;
}
@@ -247,7 +255,9 @@
public void setTimeout(long timeout) {
this.timeout = timeout;
}
-
-
+
+ public void setConnected(boolean connected) {
+ this.connected = connected;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]