Author: fhanik
Date: Thu Mar  2 12:53:55 2006
New Revision: 382509

URL: http://svn.apache.org/viewcvs?rev=382509&view=rev
Log:
more refactoring

Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java?rev=382509&r1=382508&r2=382509&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
 Thu Mar  2 12:53:55 2006
@@ -32,5 +32,8 @@
 public interface DataSender {
     public void connect() throws ChannelException;
     public void disconnect();
+    public boolean isConnected();
+    public void setRxBufSize(int size);
+    public void setTxBufSize(int size);
 
 }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java?rev=382509&r1=382508&r2=382509&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java
 Thu Mar  2 12:53:55 2006
@@ -22,6 +22,9 @@
 
 import org.apache.catalina.tribes.ChannelMessage;
 import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.tcp.PooledSender;
+import org.apache.catalina.tribes.tcp.DataSender;
+import org.apache.catalina.tribes.tcp.SenderState;
 
 /**
  * Send cluster messages with a pool of sockets (25).
@@ -31,7 +34,7 @@
  * @version 1.2
  */
 
-public class PooledSocketSender extends SinglePointDataSender {
+public class PooledSocketSender extends PooledSender {
 
     private static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory
             
.getLog(org.apache.catalina.tribes.tcp.bio.PooledSocketSender.class);
@@ -44,9 +47,16 @@
     // ----------------------------------------------------- Instance Variables
 
     private int maxPoolSocketLimit = 25;
+    private String domain;
+    private InetAddress host;
+    private int port;
+    private SenderState senderState;
+    private int keepAliveMaxRequestCount;
+    private long keepAliveTimeout;
+    private long ackTimeout;
+    private boolean resend;
+    private boolean waitForAck;
 
-    private SenderQueue senderQueue = null;
-    
     //  ----------------------------------------------------- Constructor
 
    /**
@@ -54,9 +64,12 @@
     * @param host replication node tcp address
     * @param port replication node tcp port
     */
-    public PooledSocketSender(String domain,InetAddress host, int port) {
-        super(domain,host, port);
-        senderQueue = new SenderQueue(this, maxPoolSocketLimit);
+    public PooledSocketSender(String domain,InetAddress host, int port, int 
poolSize) {
+        super(poolSize);
+        this.host = host;
+        this.domain = domain;
+        this.port = port;
+        this.maxPoolSocketLimit = poolSize;
     }
    
     //  ----------------------------------------------------- Public Properties
@@ -72,36 +85,82 @@
 
     }
 
-    public void setMaxPoolSocketLimit(int limit) {
-        maxPoolSocketLimit = limit;
-        senderQueue.setLimit(limit);
+    public void setDomain(String domain) {
+        this.domain = domain;
     }
 
-    public int getMaxPoolSocketLimit() {
-        return maxPoolSocketLimit;
+    public void setHost(InetAddress host) {
+        this.host = host;
     }
 
-    public int getInPoolSize() {
-        return senderQueue.getInPoolSize();
+    public void setPort(int port) {
+        this.port = port;
     }
 
-    public int getInUsePoolSize() {
-        return senderQueue.getInUsePoolSize();
+    public void setSenderState(SenderState senderState) {
+        this.senderState = senderState;
     }
 
-    //  ----------------------------------------------------- Public Methode
+    public void setKeepAliveMaxRequestCount(int keepAliveMaxRequestCount) {
+        this.keepAliveMaxRequestCount = keepAliveMaxRequestCount;
+    }
+
+    public void setKeepAliveTimeout(long keepAliveTimeout) {
+        this.keepAliveTimeout = keepAliveTimeout;
+    }
+
+    public void setAckTimeout(long ackTimeout) {
+        this.ackTimeout = ackTimeout;
+    }
+
+    public void setResend(boolean resend) {
+        this.resend = resend;
+    }
+
+    public void setWaitForAck(boolean waitForAck) {
+        this.waitForAck = waitForAck;
+    }
+
+    public String getDomain() {
+        return domain;
+    }
+
+    public InetAddress getHost() {
+        return host;
+    }
 
-    public synchronized void connect() throws ChannelException {
-        //do nothing, happens in the socket sender itself
-        senderQueue.open();
-        setSocketConnected(true);
+    public int getPort() {
+        return port;
     }
 
-    public synchronized void disconnect() {
-        senderQueue.close();
-        setSocketConnected(false);
+    public SenderState getSenderState() {
+        return senderState;
     }
 
+    public int getKeepAliveMaxRequestCount() {
+        return keepAliveMaxRequestCount;
+    }
+
+    public long getKeepAliveTimeout() {
+        return keepAliveTimeout;
+    }
+
+    public long getAckTimeout() {
+        return ackTimeout;
+    }
+
+    public boolean isResend() {
+        return resend;
+    }
+
+    public boolean getWaitForAck() {
+        return waitForAck;
+    }
+
+    //  ----------------------------------------------------- Public Methode
+
+    
+
     /**
      * send message and use a pool of DataSenders
      * 
@@ -117,9 +176,9 @@
                     connect();
             }
         }
-        SinglePointDataSender sender = senderQueue.getSender(0);
+        SinglePointDataSender sender = (SinglePointDataSender)getSender();
         if (sender == null) {
-            log.warn(sm.getString("PoolSocketSender.noMoreSender", 
this.getAddress(), new Integer(this.getPort())));
+            log.warn("Sender queue is empty. Can not send any messages.");
             return;
         }
         //send the message
@@ -127,141 +186,30 @@
             sender.sendMessage(data);
         } finally {
             //return the connection to the pool
-            senderQueue.returnSender(sender);
+            returnSender(sender);
         }
     }
 
     public String toString() {
         StringBuffer buf = new StringBuffer("PooledSocketSender[");
-        buf.append(getAddress()).append(":").append(getPort()).append("]");
+        buf.append(getHost()).append(":").append(getPort()).append("]");
         return buf.toString();
     }
 
-    //  ----------------------------------------------------- Inner Class
-
-    private class SenderQueue {
-        private int limit = 25;
-
-        PooledSocketSender parent = null;
-
-        private LinkedList queue = new LinkedList();
-
-        private LinkedList inuse = new LinkedList();
-
-        private Object mutex = new Object();
-
-        private boolean isOpen = true;
-
-        public SenderQueue(PooledSocketSender parent, int limit) {
-            this.limit = limit;
-            this.parent = parent;
-        }
-
-        /**
-         * @return Returns the limit.
-         */
-        public int getLimit() {
-            return limit;
-        }
-        /**
-         * @param limit The limit to set.
-         */
-        public void setLimit(int limit) {
-            this.limit = limit;
-        }
-        /**
-         * @return
-         */
-        public int getInUsePoolSize() {
-            return inuse.size();
-        }
-
-        /**
-         * @return
-         */
-        public int getInPoolSize() {
-            return queue.size();
-        }
-
-        public SinglePointDataSender getSender(long timeout) {
-            SinglePointDataSender sender = null;
-            long start = System.currentTimeMillis();
-            long delta = 0;
-            do {
-                synchronized (mutex) {
-                    if (!isOpen)
-                        throw new IllegalStateException(
-                                "Socket pool is closed.");
-                    if (queue.size() > 0) {
-                        sender = (SinglePointDataSender) queue.removeFirst();
-                    } else if (inuse.size() < limit) {
-                        sender = getNewDataSender();
-                    } else {
-                        try {
-                            mutex.wait(timeout);
-                        } catch (Exception x) {
-                            
PooledSocketSender.log.warn(sm.getString("PoolSocketSender.senderQueue.sender.failed",parent.getAddress(),new
 Integer(parent.getPort())),x);
-                        }//catch
-                    }//end if
-                    if (sender != null) {
-                        inuse.add(sender);
-                    }
-                }//synchronized
-                delta = System.currentTimeMillis() - start;
-            } while ((isOpen) && (sender == null)
-                    && (timeout == 0 ? true : (delta < timeout)));
-            //to do
-            return sender;
-        }
-
-        public void returnSender(SinglePointDataSender sender) {
-            //to do
-            synchronized (mutex) {
-                queue.add(sender);
-                inuse.remove(sender);
-                mutex.notify();
-            }
-        }
-
-        private SinglePointDataSender getNewDataSender() {
-            //new DataSender(
+    public DataSender getNewDataSender() {
+        //new DataSender(
             SinglePointDataSender sender = new 
SinglePointDataSender(getDomain(),
-                                               parent.getAddress(),
-                                               parent.getPort(),
-                                               parent.getSenderState() );
-            
sender.setKeepAliveMaxRequestCount(parent.getKeepAliveMaxRequestCount());
-            sender.setKeepAliveTimeout(parent.getKeepAliveTimeout());
-            sender.setAckTimeout(parent.getAckTimeout());
-            sender.setWaitForAck(parent.getWaitForAck());
-            sender.setResend(parent.isResend());
-            sender.setRxBufSize(parent.getRxBufSize());
-            sender.setTxBufSize(parent.getTxBufSize());
+                                               getHost(),
+                                               getPort(),
+                                               getSenderState() );
+            sender.setKeepAliveMaxRequestCount(getKeepAliveMaxRequestCount());
+            sender.setKeepAliveTimeout(getKeepAliveTimeout());
+            sender.setAckTimeout(getAckTimeout());
+            sender.setWaitForAck(getWaitForAck());
+            sender.setResend(isResend());
+            sender.setRxBufSize(getRxBufSize());
+            sender.setTxBufSize(getTxBufSize());
             return sender;
 
-        }
-
-        public void close() {
-            synchronized (mutex) {
-                for (int i = 0; i < queue.size(); i++) {
-                    SinglePointDataSender sender = (SinglePointDataSender) 
queue.get(i);
-                    sender.disconnect();
-                }//for
-                for (int i = 0; i < inuse.size(); i++) {
-                    SinglePointDataSender sender = (SinglePointDataSender) 
inuse.get(i);
-                    sender.disconnect();
-                }//for
-                queue.clear();
-                inuse.clear();
-                isOpen = false;
-                mutex.notifyAll();
-            }
-        }
-
-        public void open() {
-            synchronized (mutex) {
-                isOpen = true;
-                mutex.notifyAll();
-            }
-        }
     }
 }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java?rev=382509&r1=382508&r2=382509&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
 Thu Mar  2 12:53:55 2006
@@ -17,16 +17,16 @@
 
 
 import java.io.IOException;
+import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.util.HashMap;
+import java.util.Iterator;
 
 import org.apache.catalina.tribes.ChannelException;
 import org.apache.catalina.tribes.ChannelMessage;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.io.ClusterData;
 import org.apache.catalina.tribes.io.XByteBuffer;
-import java.util.Iterator;
-import java.nio.channels.SelectionKey;
 import org.apache.catalina.tribes.tcp.MultiPointSender;
 
 /**
@@ -53,7 +53,8 @@
     protected int rxBufSize = 43800;
     protected int txBufSize = 25188;
     protected boolean suspect = false;
-    
+    private boolean connected;
+
     public ParallelNioSender(long timeout, 
                              boolean waitForAck,
                              int retryAttempts,
@@ -189,6 +190,7 @@
     
     public void connect() {
         //do nothing, we connect on demand
+        setConnected(true);
     }
     
     
@@ -211,6 +213,7 @@
     
     public synchronized void disconnect() {
         try {close(); }catch (Exception x){}
+        setConnected(false);
     }
     
     public void finalize() {
@@ -220,6 +223,11 @@
     public boolean getSuspect() {
         return suspect;
     }
+
+    public boolean isConnected() {
+        return connected;
+    }
+
     public void setSuspect(boolean suspect) {
         this.suspect = suspect;
     }
@@ -247,7 +255,9 @@
     public void setTimeout(long timeout) {
         this.timeout = timeout;
     }
-    
-    
+
+    public void setConnected(boolean connected) {
+        this.connected = connected;
+    }
 
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to