Author: fhanik
Date: Mon Feb 27 15:01:57 2006
New Revision: 381477

URL: http://svn.apache.org/viewcvs?rev=381477&view=rev
Log:
Added in the ability to use direct byte buffers on the ReplicationListener, 
didn't notice much difference

Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/TcpReplicationThread.java

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java?rev=381477&r1=381476&r2=381477&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java
 Mon Feb 27 15:01:57 2006
@@ -383,6 +383,7 @@
         channel.setChannelListener(test);
         channel.setMembershipListener(test);
         channel.start(channel.DEFAULT);
+        Runtime.getRuntime().addShutdownHook(new Shutdown(channel));
         while ( threads > 1 ) {
             Thread t = new Thread(test);
             t.setDaemon(true);
@@ -394,6 +395,24 @@
         
         System.out.println("System test complete, sleeping to let threads 
finish.");
         Thread.sleep(60*1000*60);
-    }    
+    } 
+    
+    public static class Shutdown extends Thread {
+        ManagedChannel channel = null;
+        public Shutdown(ManagedChannel channel) {
+            this.channel = channel;
+        }
+        
+        public void run() {
+            System.out.println("Shutting down...");
+            try {
+                channel.stop(channel.DEFAULT);
+                
+            }catch ( Exception x ) {
+                x.printStackTrace();
+            }
+            System.out.println("Channel stopped.");
+        }
+    }
     
 }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java?rev=381477&r1=381476&r2=381477&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
 Mon Feb 27 15:01:57 2006
@@ -80,6 +80,13 @@
      * @return number of messages that sended to callback
      * @throws java.io.IOException
      */
+    public int append(ByteBuffer data, int len, boolean count) throws 
java.io.IOException {
+       buffer.append(data,len);
+       int pkgCnt = -1;
+       if ( count ) pkgCnt = buffer.countPackages();
+       return pkgCnt;
+   }
+
      public int append(byte[] data,int off,int len, boolean count) throws 
java.io.IOException {
         buffer.append(data,off,len);
         int pkgCnt = -1;

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=381477&r1=381476&r2=381477&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
 Mon Feb 27 15:01:57 2006
@@ -29,6 +29,7 @@
 import java.io.Serializable;
 import org.apache.catalina.tribes.Member;
 import java.util.UUID;
+import java.nio.ByteBuffer;
 
 /**
  * The XByteBuffer provides a dual functionality.
@@ -66,12 +67,12 @@
     /**
      * Default size on the initial byte buffer
      */
-    static final int DEF_SIZE = 1024;
+    public static final int DEF_SIZE = 2048;
  
     /**
      * Default size to extend the buffer with
      */
-    static final int DEF_EXT  = 1024;
+    public static final int DEF_EXT  = 1024;
     
     /**
      * Variable to hold the data
@@ -122,6 +123,27 @@
      * @param len - the number of bytes to append.
      * @return true if the data was appended correctly. Returns false if the 
package is incorrect, ie missing header or something, or the length of data is 0
      */
+    public boolean append(ByteBuffer b, int len) {
+        int newcount = bufSize + len;
+        if (newcount > buf.length) {
+            //don't change the allocation strategy
+            byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
+            System.arraycopy(buf, 0, newbuf, 0, bufSize);
+            buf = newbuf;
+        }
+        b.get(buf,bufSize,len);
+        
+        bufSize = newcount;
+
+        if (bufSize > START_DATA.length && 
(firstIndexOf(buf,0,START_DATA)==-1)){
+            bufSize = 0;
+            log.error("Discarded the package, invalid header");
+            return false;
+        }
+        return true;
+
+    }
+
     public boolean append(byte[] b, int off, int len) {
         if ((off < 0) || (off > b.length) || (len < 0) ||
             ((off + len) > b.length) || ((off + len) < 0))  {
@@ -132,6 +154,7 @@
 
         int newcount = bufSize + len;
         if (newcount > buf.length) {
+            //don't change the allocation strategy
             byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
             System.arraycopy(buf, 0, newbuf, 0, bufSize);
             buf = newbuf;

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java?rev=381477&r1=381476&r2=381477&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java
 Mon Feb 27 15:01:57 2006
@@ -31,6 +31,7 @@
 import org.apache.catalina.tribes.MessageListener;
 import org.apache.catalina.tribes.io.ListenCallback;
 import org.apache.catalina.tribes.io.ObjectReader;
+import org.apache.catalina.tribes.io.XByteBuffer;
 import org.apache.catalina.util.StringManager;
 
 /**
@@ -38,10 +39,17 @@
  * @author Peter Rossbach
  * @version $Revision: 379904 $ $Date: 2006-02-22 15:16:25 -0600 (Wed, 22 Feb 
2006) $
  */
-public class ReplicationListener
-    implements Runnable, ChannelReceiver, ListenCallback {
-    protected static org.apache.commons.logging.Log log =
-        
org.apache.commons.logging.LogFactory.getLog(ReplicationListener.class);
+public class ReplicationListener implements Runnable, ChannelReceiver, 
ListenCallback {
+    /**
+     * @todo make this configurable
+     */
+    public static int BUFFER_RECEIVE_SIZE = XByteBuffer.DEF_SIZE;
+    /**
+     * We are only sending acks
+     */
+    public static int BUFFER_SEND_SIZE = 128;
+
+    protected static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory.getLog(ReplicationListener.class);
 
     /**
      * The string manager for this package.
@@ -63,15 +71,13 @@
     private int tcpListenPort;
     private boolean sendAck;
     protected boolean doListen = false;
-    /**
-     * Compress message data bytes
-     */
-    private boolean compress = true;
+    
 
 
     private Object interestOpsMutex = new Object();
     private MessageListener listener = null;
     private boolean sync;
+    private boolean direct;
     public ReplicationListener() {
     }
 
@@ -185,8 +191,9 @@
                         ServerSocketChannel server =
                             (ServerSocketChannel) key.channel();
                         SocketChannel channel = server.accept();
-                        Object attach = new ObjectReader(channel, selector,
-                            this);
+                        
channel.socket().setReceiveBufferSize(BUFFER_RECEIVE_SIZE);
+                        channel.socket().setSendBufferSize(BUFFER_SEND_SIZE);
+                        Object attach = new ObjectReader(channel, 
selector,this);
                         registerChannel(selector,
                                         channel,
                                         SelectionKey.OP_READ,
@@ -328,20 +335,6 @@
     }
 
     /**
-     * @return Returns the compress.
-     */
-    public boolean isCompress() {
-        return compress;
-    }
-
-    /**
-     * @param compressMessageData The compress to set.
-     */
-    public void setCompress(boolean compressMessageData) {
-        this.compress = compressMessageData;
-    }
-
-    /**
      * Send ACK to sender
      *
      * @return True if sending ACK
@@ -375,6 +368,10 @@
         return sync;
     }
 
+    public boolean getDirect() {
+        return direct;
+    }
+
     public MessageListener getMessageListener() {
         return listener;
     }
@@ -383,6 +380,9 @@
         this.tcpListenPort = tcpListenPort;
     }
 
+    public void setDirect(boolean direct) {
+        this.direct = direct;
+    }
 
     public void setSynchronized(boolean sync) {
         this.sync = sync;
@@ -396,6 +396,7 @@
         int options = 0;
         if ( getSynchronized() ) options = options 
|TcpReplicationThread.OPTION_SYNCHRONIZED;
         if ( getSendAck() ) options = options 
|TcpReplicationThread.OPTION_SEND_ACK;
+        if ( getDirect() ) options = options | 
TcpReplicationThread.OPTION_DIRECT_BUFFER;
         return options;
     }
 

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/TcpReplicationThread.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/TcpReplicationThread.java?rev=381477&r1=381476&r2=381477&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/TcpReplicationThread.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/TcpReplicationThread.java
 Mon Feb 27 15:01:57 2006
@@ -39,11 +39,12 @@
 public class TcpReplicationThread extends WorkerThread {
     public static final int OPTION_SEND_ACK = 0x0001;
     public static final int OPTION_SYNCHRONIZED = 0x0002;
+    public static final int OPTION_DIRECT_BUFFER = 0x0004;
 
     public static final byte[] ACK_COMMAND = new byte[] {6, 2, 3};
     private static org.apache.commons.logging.Log log =
         org.apache.commons.logging.LogFactory.getLog( 
TcpReplicationThread.class );
-    private ByteBuffer buffer = ByteBuffer.allocate (1024);
+    private ByteBuffer buffer = null;
     private SelectionKey key;
     TcpReplicationThread ()
     {
@@ -52,6 +53,13 @@
     // loop forever waiting for work to do
     public synchronized void run()
     {
+        if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER ) {
+            System.out.println("Creating a direct buffer");
+            buffer = 
ByteBuffer.allocateDirect(ReplicationListener.BUFFER_RECEIVE_SIZE);
+        }else {
+            System.out.println("Creating a regular buffer");
+            buffer = ByteBuffer.allocate 
(ReplicationListener.BUFFER_RECEIVE_SIZE);
+        }
         while (doRun) {
             try {
                 // sleep and release object lock
@@ -131,7 +139,12 @@
         // loop while data available, channel is non-blocking
         while ((count = channel.read (buffer)) > 0) {
             buffer.flip();             // make buffer readable
-            reader.append(buffer.array(),0,count,false);
+            if ( buffer.hasArray() ) 
+                reader.append(buffer.array(),0,count,false);
+            else 
+                reader.append(buffer,count,false);
+            
+            
             buffer.clear();            // make buffer empty
         }
         



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to