Author: fhanik
Date: Mon Feb 27 10:22:16 2006
New Revision: 381403

URL: http://svn.apache.org/viewcvs?rev=381403&view=rev
Log:
Initial cleanup, getting ready to create a NIO data sender for faster 
throughput.

Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java?rev=381403&r1=381402&r2=381403&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
 Mon Feb 27 10:22:16 2006
@@ -34,25 +34,23 @@
  * @author Peter Rossbach
  * @author Filip Hanik
  * @version $Revision: 377484 $ $Date: 2006-02-13 15:00:05 -0600 (Mon, 13 Feb 
2006) $
- * @since 5.5.7
+ * @since 5.5.16
  */
 public class DataSender implements IDataSender {
 
-    private static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory
-            .getLog(DataSender.class);
+    private static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory.getLog(DataSender.class);
 
     /**
      * The string manager for this package.
      */
-    protected static StringManager sm = StringManager
-            .getManager(Constants.Package);
+    protected static StringManager sm = 
StringManager.getManager(Constants.Package);
 
     // ----------------------------------------------------- Instance Variables
 
     /**
      * The descriptive information about this implementation.
      */
-    private static final String info = "DataSender/2.1";
+    private static final String info = "DataSender/3.0";
 
     /**
      * receiver address
@@ -678,20 +676,17 @@
            return ;
        try {
             createSocket();
-            if (isWaitForAck())
-                socket.setSoTimeout((int) ackTimeout);
+            if (isWaitForAck()) socket.setSoTimeout((int) ackTimeout);
             isSocketConnected = true;
             socketOpenCounter++;
             this.keepAliveCount = 0;
             this.keepAliveConnectTime = System.currentTimeMillis();
             if (log.isDebugEnabled())
-                log.debug(sm.getString("IDataSender.openSocket", address
-                        .getHostAddress(), new Integer(port),new 
Long(socketOpenCounter)));
+                log.debug(sm.getString("IDataSender.openSocket", 
address.getHostAddress(), new Integer(port),new Long(socketOpenCounter)));
       } catch (IOException ex1) {
             socketOpenFailureCounter++ ;
             if (log.isDebugEnabled())
-                log.debug(sm.getString("IDataSender.openSocket.failure",
-                        address.getHostAddress(), new Integer(port),new 
Long(socketOpenFailureCounter)), ex1);
+                
log.debug(sm.getString("IDataSender.openSocket.failure",address.getHostAddress(),
 new Integer(port),new Long(socketOpenFailureCounter)), ex1);
             throw ex1;
         }
         
@@ -725,8 +720,7 @@
             isSocketConnected = false;
             socketCloseCounter++;
             if (log.isDebugEnabled())
-                log.debug(sm.getString("IDataSender.closeSocket",
-                        address.getHostAddress(), new Integer(port),new 
Long(socketCloseCounter)));
+                
log.debug(sm.getString("IDataSender.closeSocket",address.getHostAddress(), new 
Integer(port),new Long(socketCloseCounter)));
        }
     }
 
@@ -791,62 +785,48 @@
      * @throws java.io.IOException
      * @since 5.5.10
      */
-    protected void pushMessage( ChannelMessage data)
-            throws java.io.IOException {
-        long time = 0 ;
-        if(doProcessingStats) {
-            time = System.currentTimeMillis();
-        }
-        boolean messageTransfered = false ;
+    
+    protected void pushMessage(ChannelMessage data, boolean reconnect) throws 
java.io.IOException {
         synchronized(this) {
             checkKeepAlive();
-            if (!isConnected())
-                openSocket();
-            else if(keepAliveTimeout > -1)
-                this.keepAliveConnectTime = System.currentTimeMillis();
+            if ( reconnect ) closeSocket();
+            if (!isConnected()) openSocket();
+            else if(keepAliveTimeout > -1) this.keepAliveConnectTime = 
System.currentTimeMillis();
         }
+        writeData(data);
+        
+    }
+    
+    protected void pushMessage( ChannelMessage data) throws 
java.io.IOException {
+        long time = 0 ;
+        if(doProcessingStats) time = System.currentTimeMillis();
+        boolean messageTransfered = false ;
         IOException exception = null;
         try {
-             writeData(data);
+             // first try with existing connection
+             pushMessage(data,false);
              messageTransfered = true ;
         } catch (java.io.IOException x) {
             exception = x;
-            if( true ) { //allow resend
+            //resend
+            dataResendCounter++;
+            if (log.isTraceEnabled()) 
log.trace(sm.getString("IDataSender.send.again", address.getHostAddress(),new 
Integer(port)),x);
+            try {
                 // second try with fresh connection
-                dataResendCounter++;
-                if (log.isTraceEnabled())
-                    log.trace(sm.getString("IDataSender.send.again", 
address.getHostAddress(),
-                            new Integer(port)),x);
-                synchronized(this) {
-                    closeSocket();
-                    openSocket();
-                }
-                try {
-                    writeData(data);
-                    messageTransfered = true;
-                    exception = null;
-                } catch (IOException xx) {
-                    xx.fillInStackTrace();
-                    exception = xx;
-                }
-            } else 
-            {
-                synchronized(this) {
-                    closeSocket();
-                }
+                pushMessage(data,true);                    
+                messageTransfered = true;
+                exception = null;
+            } catch (IOException xx) {
+                exception = xx;
+                closeSocket();
             }
         } finally {
             this.keepAliveCount++;
             checkKeepAlive();
-            if(doProcessingStats) {
-                addProcessingStats(time);
-            }
+            if(doProcessingStats) addProcessingStats(time);
             if(messageTransfered) {
                 addStats(data.getMessage().length);
-                if (log.isTraceEnabled()) {
-                    log.trace(sm.getString("IDataSender.send.message", 
address.getHostAddress(),
-                        new Integer(port), data.getUniqueId(), new 
Long(data.getMessage().length)));
-                }
+                if (log.isTraceEnabled()) 
log.trace(sm.getString("IDataSender.send.message", address.getHostAddress(),new 
Integer(port), data.getUniqueId(), new Long(data.getMessage().length)));
             } else {
                 dataFailureCounter++;
                 if ( exception != null ) throw exception;
@@ -869,8 +849,7 @@
             OutputStream out = socket.getOutputStream();
             out.write(XByteBuffer.createDataPackage((ClusterData)data));
             out.flush();
-            if (isWaitForAck())
-                waitForAck(ackTimeout);
+            if (isWaitForAck()) waitForAck(ackTimeout);
         } finally {
             synchronized(this) {
                 isMessageTransferStarted = false ;
@@ -892,31 +871,20 @@
         }
         try {
             int bytesRead = 0;
-            if ( log.isTraceEnabled() ) 
-                log.trace(sm.getString("IDataSender.ack.start",getAddress(), 
new Integer(socket.getLocalPort())));
+            if ( log.isTraceEnabled() ) 
log.trace(sm.getString("IDataSender.ack.start",getAddress(), new 
Integer(socket.getLocalPort())));
             int i = socket.getInputStream().read();
             while ((i != -1) && (i != 3) && bytesRead < 10) {
-                if ( log.isTraceEnabled() ) 
-                    
log.trace(sm.getString("IDataSender.ack.read",getAddress(), new 
Integer(socket.getLocalPort()),new Character((char) i)));
+                if ( log.isTraceEnabled() )  
log.trace(sm.getString("IDataSender.ack.read",getAddress(), new 
Integer(socket.getLocalPort()),new Character((char) i)));
                 bytesRead++;
                 i = socket.getInputStream().read();
             }
             if (i != 3) {
-                if (i == -1) {
-                    throw new 
IOException(sm.getString("IDataSender.ack.eof",getAddress(), new 
Integer(socket.getLocalPort())));
-                } else {
-                    throw new 
IOException(sm.getString("IDataSender.ack.wrong",getAddress(), new 
Integer(socket.getLocalPort())));
-                }
-            } else {
-                if (log.isTraceEnabled()) {
-                    log.trace(sm.getString("IDataSender.ack.receive", 
getAddress(),new Integer(socket.getLocalPort())));
-                }
-            }
+                if (i == -1) throw new 
IOException(sm.getString("IDataSender.ack.eof",getAddress(), new 
Integer(socket.getLocalPort())));
+                else throw new 
IOException(sm.getString("IDataSender.ack.wrong",getAddress(), new 
Integer(socket.getLocalPort())));
+            } else if (log.isTraceEnabled()) 
log.trace(sm.getString("IDataSender.ack.receive", getAddress(),new 
Integer(socket.getLocalPort())));
         } catch (IOException x) {
             missingAckCounter++;
-            String errmsg = sm.getString("IDataSender.ack.missing", 
getAddress(),
-                                         new Integer(socket.getLocalPort()), 
-                                         new Long(this.ackTimeout));
+            String errmsg = sm.getString("IDataSender.ack.missing", 
getAddress(),new Integer(socket.getLocalPort()), new Long(this.ackTimeout));
             if ( !this.isSuspect() ) {
                 this.setSuspect(true);
                 if ( log.isWarnEnabled() ) log.warn(errmsg, x);
@@ -925,9 +893,7 @@
             }
             throw x;
         } finally {
-            if(doWaitAckStats) {
-                addWaitAckStats(time);
-            }
+            if(doWaitAckStats) addWaitAckStats(time);
         }
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to