Author: fhanik Date: Thu Mar 2 12:53:55 2006 New Revision: 382509 URL: http://svn.apache.org/viewcvs?rev=382509&view=rev Log: more refactoring
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java?rev=382509&r1=382508&r2=382509&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java Thu Mar 2 12:53:55 2006 @@ -32,5 +32,8 @@ public interface DataSender { public void connect() throws ChannelException; public void disconnect(); + public boolean isConnected(); + public void setRxBufSize(int size); + public void setTxBufSize(int size); } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java?rev=382509&r1=382508&r2=382509&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java Thu Mar 2 12:53:55 2006 @@ -22,6 +22,9 @@ import org.apache.catalina.tribes.ChannelMessage; import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.tcp.PooledSender; +import org.apache.catalina.tribes.tcp.DataSender; +import org.apache.catalina.tribes.tcp.SenderState; /** * Send cluster messages with a pool of sockets (25). @@ -31,7 +34,7 @@ * @version 1.2 */ -public class PooledSocketSender extends SinglePointDataSender { +public class PooledSocketSender extends PooledSender { private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory .getLog(org.apache.catalina.tribes.tcp.bio.PooledSocketSender.class); @@ -44,9 +47,16 @@ // ----------------------------------------------------- Instance Variables private int maxPoolSocketLimit = 25; + private String domain; + private InetAddress host; + private int port; + private SenderState senderState; + private int keepAliveMaxRequestCount; + private long keepAliveTimeout; + private long ackTimeout; + private boolean resend; + private boolean waitForAck; - private SenderQueue senderQueue = null; - // ----------------------------------------------------- Constructor /** @@ -54,9 +64,12 @@ * @param host replication node tcp address * @param port replication node tcp port */ - public PooledSocketSender(String domain,InetAddress host, int port) { - super(domain,host, port); - senderQueue = new SenderQueue(this, maxPoolSocketLimit); + public PooledSocketSender(String domain,InetAddress host, int port, int poolSize) { + super(poolSize); + this.host = host; + this.domain = domain; + this.port = port; + this.maxPoolSocketLimit = poolSize; } // ----------------------------------------------------- Public Properties @@ -72,36 +85,82 @@ } - public void setMaxPoolSocketLimit(int limit) { - maxPoolSocketLimit = limit; - senderQueue.setLimit(limit); + public void setDomain(String domain) { + this.domain = domain; } - public int getMaxPoolSocketLimit() { - return maxPoolSocketLimit; + public void setHost(InetAddress host) { + this.host = host; } - public int getInPoolSize() { - return senderQueue.getInPoolSize(); + public void setPort(int port) { + this.port = port; } - public int getInUsePoolSize() { - return senderQueue.getInUsePoolSize(); + public void setSenderState(SenderState senderState) { + this.senderState = senderState; } - // ----------------------------------------------------- Public Methode + public void setKeepAliveMaxRequestCount(int keepAliveMaxRequestCount) { + this.keepAliveMaxRequestCount = keepAliveMaxRequestCount; + } + + public void setKeepAliveTimeout(long keepAliveTimeout) { + this.keepAliveTimeout = keepAliveTimeout; + } + + public void setAckTimeout(long ackTimeout) { + this.ackTimeout = ackTimeout; + } + + public void setResend(boolean resend) { + this.resend = resend; + } + + public void setWaitForAck(boolean waitForAck) { + this.waitForAck = waitForAck; + } + + public String getDomain() { + return domain; + } + + public InetAddress getHost() { + return host; + } - public synchronized void connect() throws ChannelException { - //do nothing, happens in the socket sender itself - senderQueue.open(); - setSocketConnected(true); + public int getPort() { + return port; } - public synchronized void disconnect() { - senderQueue.close(); - setSocketConnected(false); + public SenderState getSenderState() { + return senderState; } + public int getKeepAliveMaxRequestCount() { + return keepAliveMaxRequestCount; + } + + public long getKeepAliveTimeout() { + return keepAliveTimeout; + } + + public long getAckTimeout() { + return ackTimeout; + } + + public boolean isResend() { + return resend; + } + + public boolean getWaitForAck() { + return waitForAck; + } + + // ----------------------------------------------------- Public Methode + + + /** * send message and use a pool of DataSenders * @@ -117,9 +176,9 @@ connect(); } } - SinglePointDataSender sender = senderQueue.getSender(0); + SinglePointDataSender sender = (SinglePointDataSender)getSender(); if (sender == null) { - log.warn(sm.getString("PoolSocketSender.noMoreSender", this.getAddress(), new Integer(this.getPort()))); + log.warn("Sender queue is empty. Can not send any messages."); return; } //send the message @@ -127,141 +186,30 @@ sender.sendMessage(data); } finally { //return the connection to the pool - senderQueue.returnSender(sender); + returnSender(sender); } } public String toString() { StringBuffer buf = new StringBuffer("PooledSocketSender["); - buf.append(getAddress()).append(":").append(getPort()).append("]"); + buf.append(getHost()).append(":").append(getPort()).append("]"); return buf.toString(); } - // ----------------------------------------------------- Inner Class - - private class SenderQueue { - private int limit = 25; - - PooledSocketSender parent = null; - - private LinkedList queue = new LinkedList(); - - private LinkedList inuse = new LinkedList(); - - private Object mutex = new Object(); - - private boolean isOpen = true; - - public SenderQueue(PooledSocketSender parent, int limit) { - this.limit = limit; - this.parent = parent; - } - - /** - * @return Returns the limit. - */ - public int getLimit() { - return limit; - } - /** - * @param limit The limit to set. - */ - public void setLimit(int limit) { - this.limit = limit; - } - /** - * @return - */ - public int getInUsePoolSize() { - return inuse.size(); - } - - /** - * @return - */ - public int getInPoolSize() { - return queue.size(); - } - - public SinglePointDataSender getSender(long timeout) { - SinglePointDataSender sender = null; - long start = System.currentTimeMillis(); - long delta = 0; - do { - synchronized (mutex) { - if (!isOpen) - throw new IllegalStateException( - "Socket pool is closed."); - if (queue.size() > 0) { - sender = (SinglePointDataSender) queue.removeFirst(); - } else if (inuse.size() < limit) { - sender = getNewDataSender(); - } else { - try { - mutex.wait(timeout); - } catch (Exception x) { - PooledSocketSender.log.warn(sm.getString("PoolSocketSender.senderQueue.sender.failed",parent.getAddress(),new Integer(parent.getPort())),x); - }//catch - }//end if - if (sender != null) { - inuse.add(sender); - } - }//synchronized - delta = System.currentTimeMillis() - start; - } while ((isOpen) && (sender == null) - && (timeout == 0 ? true : (delta < timeout))); - //to do - return sender; - } - - public void returnSender(SinglePointDataSender sender) { - //to do - synchronized (mutex) { - queue.add(sender); - inuse.remove(sender); - mutex.notify(); - } - } - - private SinglePointDataSender getNewDataSender() { - //new DataSender( + public DataSender getNewDataSender() { + //new DataSender( SinglePointDataSender sender = new SinglePointDataSender(getDomain(), - parent.getAddress(), - parent.getPort(), - parent.getSenderState() ); - sender.setKeepAliveMaxRequestCount(parent.getKeepAliveMaxRequestCount()); - sender.setKeepAliveTimeout(parent.getKeepAliveTimeout()); - sender.setAckTimeout(parent.getAckTimeout()); - sender.setWaitForAck(parent.getWaitForAck()); - sender.setResend(parent.isResend()); - sender.setRxBufSize(parent.getRxBufSize()); - sender.setTxBufSize(parent.getTxBufSize()); + getHost(), + getPort(), + getSenderState() ); + sender.setKeepAliveMaxRequestCount(getKeepAliveMaxRequestCount()); + sender.setKeepAliveTimeout(getKeepAliveTimeout()); + sender.setAckTimeout(getAckTimeout()); + sender.setWaitForAck(getWaitForAck()); + sender.setResend(isResend()); + sender.setRxBufSize(getRxBufSize()); + sender.setTxBufSize(getTxBufSize()); return sender; - } - - public void close() { - synchronized (mutex) { - for (int i = 0; i < queue.size(); i++) { - SinglePointDataSender sender = (SinglePointDataSender) queue.get(i); - sender.disconnect(); - }//for - for (int i = 0; i < inuse.size(); i++) { - SinglePointDataSender sender = (SinglePointDataSender) inuse.get(i); - sender.disconnect(); - }//for - queue.clear(); - inuse.clear(); - isOpen = false; - mutex.notifyAll(); - } - } - - public void open() { - synchronized (mutex) { - isOpen = true; - mutex.notifyAll(); - } - } } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java?rev=382509&r1=382508&r2=382509&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java Thu Mar 2 12:53:55 2006 @@ -17,16 +17,16 @@ import java.io.IOException; +import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.HashMap; +import java.util.Iterator; import org.apache.catalina.tribes.ChannelException; import org.apache.catalina.tribes.ChannelMessage; import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.io.ClusterData; import org.apache.catalina.tribes.io.XByteBuffer; -import java.util.Iterator; -import java.nio.channels.SelectionKey; import org.apache.catalina.tribes.tcp.MultiPointSender; /** @@ -53,7 +53,8 @@ protected int rxBufSize = 43800; protected int txBufSize = 25188; protected boolean suspect = false; - + private boolean connected; + public ParallelNioSender(long timeout, boolean waitForAck, int retryAttempts, @@ -189,6 +190,7 @@ public void connect() { //do nothing, we connect on demand + setConnected(true); } @@ -211,6 +213,7 @@ public synchronized void disconnect() { try {close(); }catch (Exception x){} + setConnected(false); } public void finalize() { @@ -220,6 +223,11 @@ public boolean getSuspect() { return suspect; } + + public boolean isConnected() { + return connected; + } + public void setSuspect(boolean suspect) { this.suspect = suspect; } @@ -247,7 +255,9 @@ public void setTimeout(long timeout) { this.timeout = timeout; } - - + + public void setConnected(boolean connected) { + this.connected = connected; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]