Author: fhanik
Date: Mon Feb 18 14:07:09 2008
New Revision: 628881

URL: http://svn.apache.org/viewvc?rev=628881&view=rev
Log:
Starting to add in UDP support, still need to rethink how the sender is going 
to work

Modified:
    tomcat/trunk/java/org/apache/catalina/tribes/Channel.java
    tomcat/trunk/java/org/apache/catalina/tribes/io/ObjectReader.java
    tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java
    tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
    
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
    tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
    
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java

Modified: tomcat/trunk/java/org/apache/catalina/tribes/Channel.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/Channel.java?rev=628881&r1=628880&r2=628881&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/Channel.java (original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/Channel.java Mon Feb 18 
14:07:09 2008
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -56,20 +56,20 @@
  *                                  |
  *                             Coordinator (implements 
MessageListener,MembershipListener,ChannelInterceptor)
  *                          --------------------
- *                         /        |           \ 
+ *                         /        |           \
  *                        /         |            \
  *                       /          |             \
  *                      /           |              \
  *                     /            |               \
  *           MembershipService ChannelSender ChannelReceiver                   
     [IO layer]
  * </code></pre>
- * 
+ *
  * For example usage @see org.apache.catalina.tribes.group.GroupChannel
  * @author Filip Hanik
  * @version $Revision$, $Date$
  */
 public interface Channel {
-    
+
     /**
      * Start and stop sequences can be controlled by these constants
      * This allows you to start separate components of the channel <br>
@@ -119,7 +119,7 @@
      * @see #stop(int)
      */
     public static final int MBR_TX_SEQ = 8;
-    
+
     /**
      * Send options, when a message is sent, it can have an option flag
      * to trigger certain behavior. Most flags are used to trigger channel 
interceptors
@@ -127,7 +127,7 @@
      * However, there are five default flags that every channel implementation 
must implement<br>
      * SEND_OPTIONS_BYTE_MESSAGE - The message is a pure byte message and no 
marshalling or unmarshalling will
      * be performed.<br>
-     * 
+     *
      * @see #send(Member[], Serializable , int)
      * @see #send(Member[], Serializable, int, ErrorHandler)
      */
@@ -150,27 +150,27 @@
      * to trigger certain behavior. Most flags are used to trigger channel 
interceptors
      * as the message passes through the channel stack. <br>
      * However, there are five default flags that every channel implementation 
must implement<br>
-     * SEND_OPTIONS_SYNCHRONIZED_ACK - Message is sent and an ACK is received 
when the message has been received and 
+     * SEND_OPTIONS_SYNCHRONIZED_ACK - Message is sent and an ACK is received 
when the message has been received and
      * processed by the recipient<br>
      * If no ack is received, the message is not considered successful<br>
      * @see #send(Member[], Serializable , int)
      * @see #send(Member[], Serializable, int, ErrorHandler)
      */
     public static final int SEND_OPTIONS_SYNCHRONIZED_ACK = 0x0004;
-    
+
     /**
      * Send options, when a message is sent, it can have an option flag
      * to trigger certain behavior. Most flags are used to trigger channel 
interceptors
      * as the message passes through the channel stack. <br>
      * However, there are five default flags that every channel implementation 
must implement<br>
-     * SEND_OPTIONS_ASYNCHRONOUS - Message is sent and an ACK is received when 
the message has been received and 
+     * SEND_OPTIONS_ASYNCHRONOUS - Message is sent and an ACK is received when 
the message has been received and
      * processed by the recipient<br>
      * If no ack is received, the message is not considered successful<br>
      * @see #send(Member[], Serializable , int)
      * @see #send(Member[], Serializable, int, ErrorHandler)
      */
     public static final int SEND_OPTIONS_ASYNCHRONOUS = 0x0008;
-    
+
     /**
      * Send options, when a message is sent, it can have an option flag
      * to trigger certain behavior. Most flags are used to trigger channel 
interceptors
@@ -181,7 +181,14 @@
      * @see #send(Member[], Serializable, int, ErrorHandler)
      */
     public static final int SEND_OPTIONS_SECURE = 0x0010;
-    
+
+    /**
+     * Send options. When a message is sent with this flag on
+     * the system sends the message using UDP instead of TCP
+     * @see #send(Member[], Serializable , int)
+     * @see #send(Member[], Serializable, int, ErrorHandler)
+     */
+    public static final int SEND_OPTIONS_UDP =  0x0020;
 
     /**
      * Send options, when a message is sent, it can have an option flag
@@ -196,13 +203,13 @@
      */
     public static final int SEND_OPTIONS_DEFAULT = SEND_OPTIONS_USE_ACK;
 
-    
+
     /**
      * Adds an interceptor to the channel message chain.
      * @param interceptor ChannelInterceptor
      */
     public void addInterceptor(ChannelInterceptor interceptor);
-    
+
     /**
      * Starts up the channel. This can be called multiple times for individual 
services to start
      * The svc parameter can be the logical or value of any constants
@@ -212,7 +219,7 @@
      * MBR_TX_SEQ - starts the membership broadcaster <BR>
      * SND_TX_SEQ - starts the replication transmitter<BR>
      * SND_RX_SEQ - starts the replication receiver<BR>
-     * <b>Note:</b> In order for the membership broadcaster to 
+     * <b>Note:</b> In order for the membership broadcaster to
      * transmit the correct information, it has to be started after the 
replication receiver.
      * @throws ChannelException if a startup error occurs or the service is 
already started or an error occurs.
      */
@@ -229,14 +236,14 @@
      * SND_RX_SEQ - stops the replication receiver<BR>
      * @throws ChannelException if a startup error occurs or the service is 
already stopped or an error occurs.
      */
-    public void stop(int svc) throws ChannelException;    
-    
+    public void stop(int svc) throws ChannelException;
+
     /**
      * Send a message to one or more members in the cluster
      * @param destination Member[] - the destinations, can not be null or zero 
length, the reason for that
      * is that a membership change can occur and at that time the application 
is uncertain what group the message
      * actually got sent to.
-     * @param msg Serializable - the message to send, has to be serializable, 
or a <code>ByteMessage</code> to 
+     * @param msg Serializable - the message to send, has to be serializable, 
or a <code>ByteMessage</code> to
      * send a pure byte array
      * @param options int - sender options, see class documentation for each 
interceptor that is configured in order to trigger interceptors
      * @return a unique Id that identifies the message that is sent
@@ -257,10 +264,10 @@
      * @exception ChannelException - if a serialization error happens.
      */
     public UniqueId send(Member[] destination, Serializable msg, int options, 
ErrorHandler handler) throws ChannelException;
-    
+
     /**
      * Sends a heart beat through the interceptor stacks
-     * Use this method to alert interceptors and other components to 
+     * Use this method to alert interceptors and other components to
      * clean up garbage, timed out messages etc.<br>
      * If you application has a background thread, then you can save one 
thread,
      * by configuring your channel to not use an internal heartbeat thread
@@ -268,14 +275,14 @@
      * @see #setHeartbeat(boolean)
      */
     public void heartbeat();
-    
+
     /**
      * Enables or disables internal heartbeat.
      * @param enable boolean - default value is implementation specific
      * @see #heartbeat()
      */
     public void setHeartbeat(boolean enable);
-    
+
     /**
      * Add a membership listener, will get notified when a new member joins, 
leaves or crashes
      * <br>If the membership listener implements the Heartbeat interface
@@ -284,7 +291,7 @@
      * @see MembershipListener
      */
     public void addMembershipListener(MembershipListener listener);
-    
+
     /**
      * Add a channel listener, this is a callback object when messages are 
received
      * <br>If the channel listener implements the Heartbeat interface
@@ -307,7 +314,7 @@
      * @see ChannelListener
      */
     public void removeChannelListener(ChannelListener listener);
-    
+
     /**
      * Returns true if there are any members in the group,
      * this call is the same as <code>getMembers().length>0</code>
@@ -317,7 +324,7 @@
 
     /**
      * Get all current group members
-     * @return all members or empty array, never null 
+     * @return all members or empty array, never null
      */
     public Member[] getMembers() ;
 
@@ -329,10 +336,10 @@
      * @return Member
      */
     public Member getLocalMember(boolean incAlive);
-    
+
     /**
-     * Returns the member from the membership service with complete and 
-     * recent data. Some implementations might serialize and send 
+     * Returns the member from the membership service with complete and
+     * recent data. Some implementations might serialize and send
      * membership information along with a message, and instead of sending
      * complete membership details, only send the primary identifier for the 
member
      * but not the payload or other information. When such message is received
@@ -343,5 +350,5 @@
      */
     public Member getMember(Member mbr);
 
-    
+
 }

Modified: tomcat/trunk/java/org/apache/catalina/tribes/io/ObjectReader.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/io/ObjectReader.java?rev=628881&r1=628880&r2=628881&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/io/ObjectReader.java (original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/io/ObjectReader.java Mon Feb 
18 14:07:09 2008
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -40,12 +40,15 @@
     protected static org.apache.juli.logging.Log log = 
org.apache.juli.logging.LogFactory.getLog(ObjectReader.class);
 
     private XByteBuffer buffer;
-    
+
     protected long lastAccess = System.currentTimeMillis();
-    
+
     protected boolean accessed = false;
     private boolean cancelled;
 
+    public ObjectReader(int packetSize) {
+        this.buffer = new XByteBuffer(packetSize, true);
+    }
     /**
      * Creates an <code>ObjectReader</code> for a TCP NIO socket channel
      * @param channel - the channel to be read.
@@ -53,7 +56,7 @@
     public ObjectReader(SocketChannel channel) {
         this(channel.socket());
     }
-    
+
     /**
      * Creates an <code>ObjectReader</code> for a TCP socket
      * @param socket Socket
@@ -67,23 +70,23 @@
             this.buffer = new XByteBuffer(43800,true);
         }
     }
-    
+
     public synchronized void access() {
         this.accessed = true;
         this.lastAccess = System.currentTimeMillis();
     }
-    
+
     public synchronized void finish() {
         this.accessed = false;
         this.lastAccess = System.currentTimeMillis();
     }
-    
+
     public boolean isAccessed() {
         return this.accessed;
     }
 
     /**
-     * Append new bytes to buffer. 
+     * Append new bytes to buffer.
      * @see XByteBuffer#countPackages()
      * @param data new transfer buffer
      * @param off offset
@@ -125,11 +128,11 @@
         }
         return result;
     }
-    
+
     public int bufferSize() {
         return buffer.getLength();
     }
-    
+
 
     public boolean hasPackage() {
         return buffer.countPackages(true)>0;
@@ -141,7 +144,7 @@
     public int count() {
         return buffer.countPackages();
     }
-    
+
     public void close() {
         this.buffer = null;
     }

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java?rev=628881&r1=628880&r2=628881&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java 
Mon Feb 18 14:07:09 2008
@@ -34,7 +34,7 @@
  * @version 1.0
  */
 public abstract class AbstractSender implements DataSender {
-    
+
     private boolean connected = false;
     private int rxBufSize = 25188;
     private int txBufSize = 43800;
@@ -57,7 +57,9 @@
     private int soLingerTime = 3;
     private int soTrafficClass = 0x04 | 0x08 | 0x010;
     private boolean throwOnFailedAck = true;
-    
+    private boolean udpBased = false;
+    private int udpPort = -1;
+
     /**
      * transfers sender properties from one sender to another
      * @param from AbstractSender
@@ -82,13 +84,15 @@
         to.soLingerTime = from.soLingerTime;
         to.soTrafficClass = from.soTrafficClass;
         to.throwOnFailedAck = from.throwOnFailedAck;
-    }   
+        to.udpBased = from.udpBased;
+        to.udpPort = from.udpPort;
+    }
+
 
-    
     public AbstractSender() {
-        
+
     }
-    
+
     /**
      * connect
      *
@@ -117,11 +121,11 @@
         if ( disconnect ) disconnect();
         return disconnect;
     }
-    
+
     protected void setConnected(boolean connected){
         this.connected = connected;
     }
-    
+
     public boolean isConnected() {
         return connected;
     }
@@ -170,7 +174,7 @@
     public int getMaxRetryAttempts() {
         return maxRetryAttempts;
     }
-    
+
     public void setDirect(boolean direct) {
         setDirectBuffer(direct);
     }
@@ -182,7 +186,7 @@
     public boolean getDirect() {
         return getDirectBuffer();
     }
-    
+
     public boolean getDirectBuffer() {
         return this.directBuffer;
     }
@@ -304,6 +308,26 @@
 
     public void setAddress(InetAddress address) {
         this.address = address;
+    }
+
+
+    public boolean isUdpBased() {
+        return udpBased;
+    }
+
+
+    public void setUdpBased(boolean udpBased) {
+        this.udpBased = udpBased;
+    }
+
+
+    public int getUdpPort() {
+        return udpPort;
+    }
+
+
+    public void setUdpPort(int udpPort) {
+        this.udpPort = udpPort;
     }
 
 }

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java?rev=628881&r1=628880&r2=628881&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java 
Mon Feb 18 14:07:09 2008
@@ -18,7 +18,9 @@
 package org.apache.catalina.tribes.transport.nio;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.ServerSocket;
+import java.nio.channels.DatagramChannel;
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
@@ -59,6 +61,7 @@
 
     private Selector selector = null;
     private ServerSocketChannel serverChannel = null;
+    private DatagramChannel datagramChannel = null;
 
     protected LinkedList events = new LinkedList();
 //    private Object interestOpsMutex = new Object();
@@ -110,7 +113,7 @@
             else throw new IOException(x.getMessage());
         }
     }
-    
+
     public AbstractRxTask createRxTask() {
         NioReplicationTask thread = new NioReplicationTask(this,this);
         thread.setUseBufferPool(this.getUseBufferPool());
@@ -118,9 +121,9 @@
         thread.setOptions(getWorkerThreadOptions());
         return thread;
     }
-    
-    
-    
+
+
+
     protected void bind() throws IOException {
         // allocate an unbound server socket channel
         serverChannel = ServerSocketChannel.open();
@@ -135,9 +138,22 @@
         serverChannel.configureBlocking(false);
         // register the ServerSocketChannel with the Selector
         serverChannel.register(selector, SelectionKey.OP_ACCEPT);
-        
+
+        //set up the datagram channel
+        if (this.getUdpPort()>0) {
+            datagramChannel = DatagramChannel.open();
+            datagramChannel.configureBlocking(false);
+            //bind to the address to avoid security checks
+            InetSocketAddress daddr = new 
InetSocketAddress(getBind(),getUdpPort());
+            //TODO should we auto increment the UDP port to avoid collisions?
+            //we could auto increment with the offset from the tcp listen port
+            datagramChannel.connect(daddr);
+        }
+
+
+
     }
-    
+
     public void addEvent(Runnable event) {
         if ( selector != null ) {
             synchronized (events) {
@@ -163,18 +179,18 @@
             events.clear();
         }
     }
-    
+
     public static void cancelledKey(SelectionKey key) {
         ObjectReader reader = (ObjectReader)key.attachment();
         if ( reader != null ) {
             reader.setCancelled(true);
             reader.finish();
         }
-        key.cancel(); 
+        key.cancel();
         key.attach(null);
         try { ((SocketChannel)key.channel()).socket().close(); } catch 
(IOException e) { if (log.isDebugEnabled()) log.debug("", e); }
         try { key.channel().close(); } catch (IOException e) { if 
(log.isDebugEnabled()) log.debug("", e); }
-        
+
     }
     protected long lastCheck = System.currentTimeMillis();
     protected void socketTimeouts() {
@@ -202,7 +218,7 @@
                     if ( ka != null ) {
                         long delta = now - ka.getLastAccess();
                         if (delta > (long) getTimeout() && (!ka.isAccessed())) 
{
-                            if (log.isWarnEnabled()) 
+                            if (log.isWarnEnabled())
                                 log.warn("Channel key is registered, but has 
had no interest ops for the last "+getTimeout()+" ms. 
(cancelled:"+ka.isCancelled()+"):"+key+" last access:"+new 
java.sql.Timestamp(ka.getLastAccess())+" Possible cause: all threads used, 
perform thread dump");
                             ka.setLastAccess(now);
                             //key.interestOps(SelectionKey.OP_READ);
@@ -230,8 +246,12 @@
             log.warn("ServerSocketChannel already started");
             return;
         }
-        
+
         setListen(true);
+        if (selector!=null && datagramChannel!=null) {
+            ObjectReader oreader = new ObjectReader(1024*65);
+            
registerChannel(selector,datagramChannel,SelectionKey.OP_READ,oreader);
+        }
 
         while (doListen() && selector != null) {
             // this may block for a long time, upon return the
@@ -302,10 +322,18 @@
 
         }
         serverChannel.close();
+        if (datagramChannel!=null) {
+            try {
+                datagramChannel.close();
+            }catch (Exception iox) {
+                if (log.isDebugEnabled()) log.debug("Unable to close datagram 
channel.",iox);
+            }
+            datagramChannel=null;
+        }
         closeSelector();
     }
 
-    
+
 
     /**
      * Close Selector.

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java?rev=628881&r1=628880&r2=628881&view=diff
==============================================================================
--- 
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
 (original)
+++ 
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
 Mon Feb 18 14:07:09 2008
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,6 +18,9 @@
 package org.apache.catalina.tribes.transport.nio;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 
@@ -42,15 +45,15 @@
  * serviceChannel() method stores the key reference in the thread object then
  * calls notify() to wake it up. When the channel has been drained, the worker
  * thread returns itself to its parent pool.
- * 
+ *
  * @author Filip Hanik
- * 
+ *
  * @version $Revision$, $Date$
  */
 public class NioReplicationTask extends AbstractRxTask {
-    
+
     private static org.apache.juli.logging.Log log = 
org.apache.juli.logging.LogFactory.getLog( NioReplicationTask.class );
-    
+
     private ByteBuffer buffer = null;
     private SelectionKey key;
     private int rxBufSize;
@@ -62,7 +65,7 @@
     }
 
     // loop forever waiting for work to do
-    public synchronized void run() { 
+    public synchronized void run() {
         if ( buffer == null ) {
             if ( (getOptions() & OPTION_DIRECT_BUFFER) == 
OPTION_DIRECT_BUFFER) {
                 buffer = ByteBuffer.allocateDirect(getRxBufSize());
@@ -75,17 +78,17 @@
         if (key == null) {
             return;    // just in case
         }
-        if ( log.isTraceEnabled() ) 
+        if ( log.isTraceEnabled() )
             log.trace("Servicing key:"+key);
 
         try {
             ObjectReader reader = (ObjectReader)key.attachment();
             if ( reader == null ) {
-                if ( log.isTraceEnabled() ) 
+                if ( log.isTraceEnabled() )
                     log.trace("No object reader, cancelling:"+key);
                 cancelKey(key);
             } else {
-                if ( log.isTraceEnabled() ) 
+                if ( log.isTraceEnabled() )
                     log.trace("Draining channel:"+key);
 
                 drainChannel(key, reader);
@@ -102,7 +105,7 @@
             } else if ( log.isErrorEnabled() ) {
                 //this is a real error, log it.
                 log.error("Exception caught in 
TcpReplicationThread.drainChannel.",e);
-            } 
+            }
             cancelKey(key);
         } finally {
 
@@ -143,16 +146,16 @@
     protected void drainChannel (final SelectionKey key, ObjectReader reader) 
throws Exception {
         reader.setLastAccess(System.currentTimeMillis());
         reader.access();
-        SocketChannel channel = (SocketChannel) key.channel();
+        ReadableByteChannel channel = (ReadableByteChannel) key.channel();
         int count;
         buffer.clear();                        // make buffer empty
 
         // loop while data available, channel is non-blocking
         while ((count = channel.read (buffer)) > 0) {
             buffer.flip();             // make buffer readable
-            if ( buffer.hasArray() ) 
+            if ( buffer.hasArray() )
                 reader.append(buffer.array(),0,count,false);
-            else 
+            else
                 reader.append(buffer,count,false);
             buffer.clear();            // make buffer empty
             //do we have at least one package?
@@ -160,24 +163,24 @@
         }
 
         int pkgcnt = reader.count();
-        
+
         if (count < 0 && pkgcnt == 0 ) {
             //end of stream, and no more packages to process
             remoteEof(key);
             return;
         }
-        
+
         ChannelMessage[] msgs = pkgcnt == 0? ChannelData.EMPTY_DATA_ARRAY : 
reader.execute();
-        
+
         registerForRead(key,reader);//register to read new data, before we 
send it off to avoid dead locks
-        
+
         for ( int i=0; i<msgs.length; i++ ) {
             /**
-             * Use send ack here if you want to ack the request to the remote 
+             * Use send ack here if you want to ack the request to the remote
              * server before completing the request
              * This is considered an asynchronized request
              */
-            if (ChannelData.sendAckAsync(msgs[i].getOptions())) 
sendAck(key,channel,Constants.ACK_COMMAND);
+            if (ChannelData.sendAckAsync(msgs[i].getOptions())) 
sendAck(key,(WritableByteChannel)channel,Constants.ACK_COMMAND);
             try {
                 if ( Logs.MESSAGES.isTraceEnabled() ) {
                     try {
@@ -187,24 +190,24 @@
                 //process the message
                 getCallback().messageDataReceived(msgs[i]);
                 /**
-                 * Use send ack here if you want the request to complete on 
this 
+                 * Use send ack here if you want the request to complete on 
this
                  * server before sending the ack to the remote server
                  * This is considered a synchronized request
                  */
-                if (ChannelData.sendAckSync(msgs[i].getOptions())) 
sendAck(key,channel,Constants.ACK_COMMAND);
+                if (ChannelData.sendAckSync(msgs[i].getOptions())) 
sendAck(key,(WritableByteChannel)channel,Constants.ACK_COMMAND);
             }catch ( RemoteProcessException e ) {
                 if ( log.isDebugEnabled() ) log.error("Processing of cluster 
message failed.",e);
-                if (ChannelData.sendAckSync(msgs[i].getOptions())) 
sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
+                if (ChannelData.sendAckSync(msgs[i].getOptions())) 
sendAck(key,(WritableByteChannel)channel,Constants.FAIL_ACK_COMMAND);
             }catch ( Exception e ) {
                 log.error("Processing of cluster message failed.",e);
-                if (ChannelData.sendAckSync(msgs[i].getOptions())) 
sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
+                if (ChannelData.sendAckSync(msgs[i].getOptions())) 
sendAck(key,(WritableByteChannel)channel,Constants.FAIL_ACK_COMMAND);
             }
             if ( getUseBufferPool() ) {
                 BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
                 msgs[i].setMessage(null);
             }
-        }                        
-        
+        }
+
         if (count < 0) {
             remoteEof(key);
             return;
@@ -218,7 +221,7 @@
     }
 
     protected void registerForRead(final SelectionKey key, ObjectReader 
reader) {
-        if ( log.isTraceEnabled() ) 
+        if ( log.isTraceEnabled() )
             log.trace("Adding key for read event:"+key);
         reader.finish();
         //register our OP_READ interest
@@ -231,12 +234,12 @@
                         // resume interest in OP_READ, OP_WRITE
                         int resumeOps = key.interestOps() | 
SelectionKey.OP_READ;
                         key.interestOps(resumeOps);
-                        if ( log.isTraceEnabled() ) 
+                        if ( log.isTraceEnabled() )
                             log.trace("Registering key for read:"+key);
                     }
                 } catch (CancelledKeyException ckx ) {
                     NioReceiver.cancelledKey(key);
-                    if ( log.isTraceEnabled() ) 
+                    if ( log.isTraceEnabled() )
                         log.trace("CKX Cancelling key:"+key);
 
                 } catch (Exception x) {
@@ -248,7 +251,7 @@
     }
 
     private void cancelKey(final SelectionKey key) {
-        if ( log.isTraceEnabled() ) 
+        if ( log.isTraceEnabled() )
             log.trace("Adding key for cancel event:"+key);
 
         ObjectReader reader = (ObjectReader)key.attachment();
@@ -258,7 +261,7 @@
         }
         Runnable cx = new Runnable() {
             public void run() {
-                if ( log.isTraceEnabled() ) 
+                if ( log.isTraceEnabled() )
                     log.trace("Cancelling key:"+key);
 
                 NioReceiver.cancelledKey(key);
@@ -266,8 +269,8 @@
         };
         receiver.addEvent(cx);
     }
-    
-    
+
+
 
 
 
@@ -276,8 +279,8 @@
      * @param key
      * @param channel
      */
-    protected void sendAck(SelectionKey key, SocketChannel channel, byte[] 
command) {
-        
+    protected void sendAck(SelectionKey key, WritableByteChannel channel, 
byte[] command) {
+
         try {
             ByteBuffer buf = ByteBuffer.wrap(command);
             int total = 0;
@@ -285,7 +288,10 @@
                 total += channel.write(buf);
             }
             if (log.isTraceEnabled()) {
-                log.trace("ACK sent to " + channel.socket().getPort());
+                log.trace("ACK sent to " +
+                        ( (channel instanceof SocketChannel) ?
+                          ((SocketChannel)channel).socket().getInetAddress() :
+                          
((DatagramChannel)channel).socket().getInetAddress()));
             }
         } catch ( java.io.IOException x ) {
             log.warn("Unable to send ACK back through channel, channel 
disconnected?: "+x.getMessage());

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java?rev=628881&r1=628880&r2=628881&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java 
Mon Feb 18 14:07:09 2008
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
@@ -34,7 +35,7 @@
 
 /**
  * This class is NOT thread safe and should never be used with more than one 
thread at a time
- * 
+ *
  * This is a state machine, handled by the process method
  * States are:
  * - NOT_CONNECTED -> connect() -> CONNECTED
@@ -42,7 +43,7 @@
  * - READY_TO_WRITE -> write() -> READY TO WRITE | READY TO READ
  * - READY_TO_READ -> read() -> READY_TO_READ | TRANSFER_COMPLETE
  * - TRANSFER_COMPLETE -> CONNECTED
- * 
+ *
  * @author Filip Hanik
  * @version 1.0
  */
@@ -50,10 +51,11 @@
 
     protected static org.apache.juli.logging.Log log = 
org.apache.juli.logging.LogFactory.getLog(NioSender.class);
 
-    
-    
-    protected Selector selector;    
-    protected SocketChannel socketChannel;
+
+
+    protected Selector selector;
+    protected SocketChannel socketChannel = null;
+    protected DatagramChannel dataChannel = null;
 
     /*
      * STATE VARIABLES *
@@ -64,14 +66,14 @@
     protected XByteBuffer ackbuf = new XByteBuffer(128,true);
     protected int remaining = 0;
     protected boolean complete;
-    
+
     protected boolean connecting = false;
-    
+
     public NioSender() {
         super();
-        
+
     }
-    
+
     /**
      * State machine to send data
      * @param key SelectionKey
@@ -89,7 +91,7 @@
                 completeConnect();
                 if ( current != null ) key.interestOps(key.interestOps() | 
SelectionKey.OP_WRITE);
                 return false;
-            } else  { 
+            } else  {
                 //wait for the connection to finish
                 key.interestOps(key.interestOps() | SelectionKey.OP_CONNECT);
                 return false;
@@ -146,8 +148,8 @@
         socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime());
         socketChannel.socket().setTrafficClass(getSoTrafficClass());
     }
-    
-    
+
+
 
     protected boolean read(SelectionKey key) throws IOException {
         //if there is no message here, we are done
@@ -171,7 +173,7 @@
         }
     }
 
-    
+
     protected boolean write(SelectionKey key) throws IOException {
         if ( (!isConnected()) || (this.socketChannel==null)) {
             throw new IOException("NioSender is not connected, this should not 
occur.");
@@ -215,7 +217,7 @@
         } else {
             writebuf.clear();
         }
-        
+
         InetSocketAddress addr = new InetSocketAddress(getAddress(),getPort());
         if ( socketChannel != null ) throw new IOException("Socket channel has 
already been established. Connection might be in progress.");
         socketChannel = SocketChannel.open();
@@ -227,7 +229,7 @@
             socketChannel.register(getSelector(), SelectionKey.OP_CONNECT, 
this);
         }
     }
-    
+
 
     /**
      * disconnect
@@ -257,7 +259,7 @@
         }
 
     }
-    
+
     public void reset() {
         if ( isConnected() && readbuf == null) {
             readbuf = getReadBuffer();
@@ -273,10 +275,10 @@
         setConnectTime(-1);
     }
 
-    private ByteBuffer getReadBuffer() { 
+    private ByteBuffer getReadBuffer() {
         return getBuffer(getRxBufSize());
     }
-    
+
     private ByteBuffer getWriteBuffer() {
         return getBuffer(getTxBufSize());
     }
@@ -284,7 +286,7 @@
     private ByteBuffer getBuffer(int size) {
         return 
(getDirectBuffer()?ByteBuffer.allocateDirect(size):ByteBuffer.allocate(size));
     }
-    
+
     /**
     * sendMessage
     *
@@ -312,9 +314,9 @@
            if (isConnected()) {
                socketChannel.register(getSelector(), SelectionKey.OP_WRITE, 
this);
            }
-       } 
+       }
    }
-   
+
    public byte[] getMessage() {
        return current;
    }

Modified: 
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java?rev=628881&r1=628880&r2=628881&view=diff
==============================================================================
--- 
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java 
(original)
+++ 
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java 
Mon Feb 18 14:07:09 2008
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,6 +15,7 @@
  */
 package org.apache.catalina.tribes.test.channel;
 
+import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.group.GroupChannel;
 import junit.framework.TestCase;
 import org.apache.catalina.tribes.transport.ReceiverBase;
@@ -25,6 +26,7 @@
  */
 public class ChannelStartStop extends TestCase {
     GroupChannel channel = null;
+    int udpPort = 45543;
     protected void setUp() throws Exception {
         super.setUp();
         channel = new GroupChannel();
@@ -34,7 +36,7 @@
         super.tearDown();
         try {channel.stop(channel.DEFAULT);}catch (Exception ignore){}
     }
-    
+
     public void testDoubleFullStart() throws Exception {
         int count = 0;
         try {
@@ -52,11 +54,11 @@
     public void testScrap() throws Exception {
         System.out.println(channel.getChannelReceiver().getClass());
         ((ReceiverBase)channel.getChannelReceiver()).setMaxThreads(1);
-    } 
+    }
 
 
     public void testDoublePartialStart() throws Exception {
-        //try to double start the RX 
+        //try to double start the RX
         int count = 0;
         try {
             channel.start(channel.SND_RX_SEQ);
@@ -82,7 +84,7 @@
         } catch ( Exception x){/*expected*/}
         assertEquals(count,1);
         channel.stop(channel.DEFAULT);
-        
+
         count = 0;
         try {
             channel.start(channel.SND_RX_SEQ);
@@ -107,7 +109,7 @@
         assertEquals(count,1);
         channel.stop(channel.DEFAULT);
     }
-    
+
     public void testFalseOption() throws Exception {
         int flag = 0xFFF0;//should get ignored by the underlying components
         int count = 0;
@@ -121,6 +123,14 @@
         } catch ( Exception x){/*expected*/}
         assertEquals(count,2);
         channel.stop(channel.DEFAULT);
+    }
+
+    public void testUdpReceiverStart() throws Exception {
+        ReceiverBase rb = (ReceiverBase)channel.getChannelReceiver();
+        rb.setUdpPort(udpPort);
+        channel.start(Channel.DEFAULT);
+        Thread.sleep(1000);
+        channel.stop(Channel.DEFAULT);
     }
 
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to