Author: fhanik Date: Mon Feb 18 14:07:09 2008 New Revision: 628881 URL: http://svn.apache.org/viewvc?rev=628881&view=rev Log: Starting to add in UDP support, still need to rethink how the sender is going to work
Modified: tomcat/trunk/java/org/apache/catalina/tribes/Channel.java tomcat/trunk/java/org/apache/catalina/tribes/io/ObjectReader.java tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java tomcat/trunk/test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java Modified: tomcat/trunk/java/org/apache/catalina/tribes/Channel.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/Channel.java?rev=628881&r1=628880&r2=628881&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/Channel.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/Channel.java Mon Feb 18 14:07:09 2008 @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -56,20 +56,20 @@ * | * Coordinator (implements MessageListener,MembershipListener,ChannelInterceptor) * -------------------- - * / | \ + * / | \ * / | \ * / | \ * / | \ * / | \ * MembershipService ChannelSender ChannelReceiver [IO layer] * </code></pre> - * + * * For example usage @see org.apache.catalina.tribes.group.GroupChannel * @author Filip Hanik * @version $Revision$, $Date$ */ public interface Channel { - + /** * Start and stop sequences can be controlled by these constants * This allows you to start separate components of the channel <br> @@ -119,7 +119,7 @@ * @see #stop(int) */ public static final int MBR_TX_SEQ = 8; - + /** * Send options, when a message is sent, it can have an option flag * to trigger certain behavior. Most flags are used to trigger channel interceptors @@ -127,7 +127,7 @@ * However, there are five default flags that every channel implementation must implement<br> * SEND_OPTIONS_BYTE_MESSAGE - The message is a pure byte message and no marshalling or unmarshalling will * be performed.<br> - * + * * @see #send(Member[], Serializable , int) * @see #send(Member[], Serializable, int, ErrorHandler) */ @@ -150,27 +150,27 @@ * to trigger certain behavior. Most flags are used to trigger channel interceptors * as the message passes through the channel stack. <br> * However, there are five default flags that every channel implementation must implement<br> - * SEND_OPTIONS_SYNCHRONIZED_ACK - Message is sent and an ACK is received when the message has been received and + * SEND_OPTIONS_SYNCHRONIZED_ACK - Message is sent and an ACK is received when the message has been received and * processed by the recipient<br> * If no ack is received, the message is not considered successful<br> * @see #send(Member[], Serializable , int) * @see #send(Member[], Serializable, int, ErrorHandler) */ public static final int SEND_OPTIONS_SYNCHRONIZED_ACK = 0x0004; - + /** * Send options, when a message is sent, it can have an option flag * to trigger certain behavior. Most flags are used to trigger channel interceptors * as the message passes through the channel stack. <br> * However, there are five default flags that every channel implementation must implement<br> - * SEND_OPTIONS_ASYNCHRONOUS - Message is sent and an ACK is received when the message has been received and + * SEND_OPTIONS_ASYNCHRONOUS - Message is sent and an ACK is received when the message has been received and * processed by the recipient<br> * If no ack is received, the message is not considered successful<br> * @see #send(Member[], Serializable , int) * @see #send(Member[], Serializable, int, ErrorHandler) */ public static final int SEND_OPTIONS_ASYNCHRONOUS = 0x0008; - + /** * Send options, when a message is sent, it can have an option flag * to trigger certain behavior. Most flags are used to trigger channel interceptors @@ -181,7 +181,14 @@ * @see #send(Member[], Serializable, int, ErrorHandler) */ public static final int SEND_OPTIONS_SECURE = 0x0010; - + + /** + * Send options. When a message is sent with this flag on + * the system sends the message using UDP instead of TCP + * @see #send(Member[], Serializable , int) + * @see #send(Member[], Serializable, int, ErrorHandler) + */ + public static final int SEND_OPTIONS_UDP = 0x0020; /** * Send options, when a message is sent, it can have an option flag @@ -196,13 +203,13 @@ */ public static final int SEND_OPTIONS_DEFAULT = SEND_OPTIONS_USE_ACK; - + /** * Adds an interceptor to the channel message chain. * @param interceptor ChannelInterceptor */ public void addInterceptor(ChannelInterceptor interceptor); - + /** * Starts up the channel. This can be called multiple times for individual services to start * The svc parameter can be the logical or value of any constants @@ -212,7 +219,7 @@ * MBR_TX_SEQ - starts the membership broadcaster <BR> * SND_TX_SEQ - starts the replication transmitter<BR> * SND_RX_SEQ - starts the replication receiver<BR> - * <b>Note:</b> In order for the membership broadcaster to + * <b>Note:</b> In order for the membership broadcaster to * transmit the correct information, it has to be started after the replication receiver. * @throws ChannelException if a startup error occurs or the service is already started or an error occurs. */ @@ -229,14 +236,14 @@ * SND_RX_SEQ - stops the replication receiver<BR> * @throws ChannelException if a startup error occurs or the service is already stopped or an error occurs. */ - public void stop(int svc) throws ChannelException; - + public void stop(int svc) throws ChannelException; + /** * Send a message to one or more members in the cluster * @param destination Member[] - the destinations, can not be null or zero length, the reason for that * is that a membership change can occur and at that time the application is uncertain what group the message * actually got sent to. - * @param msg Serializable - the message to send, has to be serializable, or a <code>ByteMessage</code> to + * @param msg Serializable - the message to send, has to be serializable, or a <code>ByteMessage</code> to * send a pure byte array * @param options int - sender options, see class documentation for each interceptor that is configured in order to trigger interceptors * @return a unique Id that identifies the message that is sent @@ -257,10 +264,10 @@ * @exception ChannelException - if a serialization error happens. */ public UniqueId send(Member[] destination, Serializable msg, int options, ErrorHandler handler) throws ChannelException; - + /** * Sends a heart beat through the interceptor stacks - * Use this method to alert interceptors and other components to + * Use this method to alert interceptors and other components to * clean up garbage, timed out messages etc.<br> * If you application has a background thread, then you can save one thread, * by configuring your channel to not use an internal heartbeat thread @@ -268,14 +275,14 @@ * @see #setHeartbeat(boolean) */ public void heartbeat(); - + /** * Enables or disables internal heartbeat. * @param enable boolean - default value is implementation specific * @see #heartbeat() */ public void setHeartbeat(boolean enable); - + /** * Add a membership listener, will get notified when a new member joins, leaves or crashes * <br>If the membership listener implements the Heartbeat interface @@ -284,7 +291,7 @@ * @see MembershipListener */ public void addMembershipListener(MembershipListener listener); - + /** * Add a channel listener, this is a callback object when messages are received * <br>If the channel listener implements the Heartbeat interface @@ -307,7 +314,7 @@ * @see ChannelListener */ public void removeChannelListener(ChannelListener listener); - + /** * Returns true if there are any members in the group, * this call is the same as <code>getMembers().length>0</code> @@ -317,7 +324,7 @@ /** * Get all current group members - * @return all members or empty array, never null + * @return all members or empty array, never null */ public Member[] getMembers() ; @@ -329,10 +336,10 @@ * @return Member */ public Member getLocalMember(boolean incAlive); - + /** - * Returns the member from the membership service with complete and - * recent data. Some implementations might serialize and send + * Returns the member from the membership service with complete and + * recent data. Some implementations might serialize and send * membership information along with a message, and instead of sending * complete membership details, only send the primary identifier for the member * but not the payload or other information. When such message is received @@ -343,5 +350,5 @@ */ public Member getMember(Member mbr); - + } Modified: tomcat/trunk/java/org/apache/catalina/tribes/io/ObjectReader.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/io/ObjectReader.java?rev=628881&r1=628880&r2=628881&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/io/ObjectReader.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/io/ObjectReader.java Mon Feb 18 14:07:09 2008 @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -40,12 +40,15 @@ protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(ObjectReader.class); private XByteBuffer buffer; - + protected long lastAccess = System.currentTimeMillis(); - + protected boolean accessed = false; private boolean cancelled; + public ObjectReader(int packetSize) { + this.buffer = new XByteBuffer(packetSize, true); + } /** * Creates an <code>ObjectReader</code> for a TCP NIO socket channel * @param channel - the channel to be read. @@ -53,7 +56,7 @@ public ObjectReader(SocketChannel channel) { this(channel.socket()); } - + /** * Creates an <code>ObjectReader</code> for a TCP socket * @param socket Socket @@ -67,23 +70,23 @@ this.buffer = new XByteBuffer(43800,true); } } - + public synchronized void access() { this.accessed = true; this.lastAccess = System.currentTimeMillis(); } - + public synchronized void finish() { this.accessed = false; this.lastAccess = System.currentTimeMillis(); } - + public boolean isAccessed() { return this.accessed; } /** - * Append new bytes to buffer. + * Append new bytes to buffer. * @see XByteBuffer#countPackages() * @param data new transfer buffer * @param off offset @@ -125,11 +128,11 @@ } return result; } - + public int bufferSize() { return buffer.getLength(); } - + public boolean hasPackage() { return buffer.countPackages(true)>0; @@ -141,7 +144,7 @@ public int count() { return buffer.countPackages(); } - + public void close() { this.buffer = null; } Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java?rev=628881&r1=628880&r2=628881&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java Mon Feb 18 14:07:09 2008 @@ -34,7 +34,7 @@ * @version 1.0 */ public abstract class AbstractSender implements DataSender { - + private boolean connected = false; private int rxBufSize = 25188; private int txBufSize = 43800; @@ -57,7 +57,9 @@ private int soLingerTime = 3; private int soTrafficClass = 0x04 | 0x08 | 0x010; private boolean throwOnFailedAck = true; - + private boolean udpBased = false; + private int udpPort = -1; + /** * transfers sender properties from one sender to another * @param from AbstractSender @@ -82,13 +84,15 @@ to.soLingerTime = from.soLingerTime; to.soTrafficClass = from.soTrafficClass; to.throwOnFailedAck = from.throwOnFailedAck; - } + to.udpBased = from.udpBased; + to.udpPort = from.udpPort; + } + - public AbstractSender() { - + } - + /** * connect * @@ -117,11 +121,11 @@ if ( disconnect ) disconnect(); return disconnect; } - + protected void setConnected(boolean connected){ this.connected = connected; } - + public boolean isConnected() { return connected; } @@ -170,7 +174,7 @@ public int getMaxRetryAttempts() { return maxRetryAttempts; } - + public void setDirect(boolean direct) { setDirectBuffer(direct); } @@ -182,7 +186,7 @@ public boolean getDirect() { return getDirectBuffer(); } - + public boolean getDirectBuffer() { return this.directBuffer; } @@ -304,6 +308,26 @@ public void setAddress(InetAddress address) { this.address = address; + } + + + public boolean isUdpBased() { + return udpBased; + } + + + public void setUdpBased(boolean udpBased) { + this.udpBased = udpBased; + } + + + public int getUdpPort() { + return udpPort; + } + + + public void setUdpPort(int udpPort) { + this.udpPort = udpPort; } } Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java?rev=628881&r1=628880&r2=628881&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java Mon Feb 18 14:07:09 2008 @@ -18,7 +18,9 @@ package org.apache.catalina.tribes.transport.nio; import java.io.IOException; +import java.net.InetSocketAddress; import java.net.ServerSocket; +import java.nio.channels.DatagramChannel; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; @@ -59,6 +61,7 @@ private Selector selector = null; private ServerSocketChannel serverChannel = null; + private DatagramChannel datagramChannel = null; protected LinkedList events = new LinkedList(); // private Object interestOpsMutex = new Object(); @@ -110,7 +113,7 @@ else throw new IOException(x.getMessage()); } } - + public AbstractRxTask createRxTask() { NioReplicationTask thread = new NioReplicationTask(this,this); thread.setUseBufferPool(this.getUseBufferPool()); @@ -118,9 +121,9 @@ thread.setOptions(getWorkerThreadOptions()); return thread; } - - - + + + protected void bind() throws IOException { // allocate an unbound server socket channel serverChannel = ServerSocketChannel.open(); @@ -135,9 +138,22 @@ serverChannel.configureBlocking(false); // register the ServerSocketChannel with the Selector serverChannel.register(selector, SelectionKey.OP_ACCEPT); - + + //set up the datagram channel + if (this.getUdpPort()>0) { + datagramChannel = DatagramChannel.open(); + datagramChannel.configureBlocking(false); + //bind to the address to avoid security checks + InetSocketAddress daddr = new InetSocketAddress(getBind(),getUdpPort()); + //TODO should we auto increment the UDP port to avoid collisions? + //we could auto increment with the offset from the tcp listen port + datagramChannel.connect(daddr); + } + + + } - + public void addEvent(Runnable event) { if ( selector != null ) { synchronized (events) { @@ -163,18 +179,18 @@ events.clear(); } } - + public static void cancelledKey(SelectionKey key) { ObjectReader reader = (ObjectReader)key.attachment(); if ( reader != null ) { reader.setCancelled(true); reader.finish(); } - key.cancel(); + key.cancel(); key.attach(null); try { ((SocketChannel)key.channel()).socket().close(); } catch (IOException e) { if (log.isDebugEnabled()) log.debug("", e); } try { key.channel().close(); } catch (IOException e) { if (log.isDebugEnabled()) log.debug("", e); } - + } protected long lastCheck = System.currentTimeMillis(); protected void socketTimeouts() { @@ -202,7 +218,7 @@ if ( ka != null ) { long delta = now - ka.getLastAccess(); if (delta > (long) getTimeout() && (!ka.isAccessed())) { - if (log.isWarnEnabled()) + if (log.isWarnEnabled()) log.warn("Channel key is registered, but has had no interest ops for the last "+getTimeout()+" ms. (cancelled:"+ka.isCancelled()+"):"+key+" last access:"+new java.sql.Timestamp(ka.getLastAccess())+" Possible cause: all threads used, perform thread dump"); ka.setLastAccess(now); //key.interestOps(SelectionKey.OP_READ); @@ -230,8 +246,12 @@ log.warn("ServerSocketChannel already started"); return; } - + setListen(true); + if (selector!=null && datagramChannel!=null) { + ObjectReader oreader = new ObjectReader(1024*65); + registerChannel(selector,datagramChannel,SelectionKey.OP_READ,oreader); + } while (doListen() && selector != null) { // this may block for a long time, upon return the @@ -302,10 +322,18 @@ } serverChannel.close(); + if (datagramChannel!=null) { + try { + datagramChannel.close(); + }catch (Exception iox) { + if (log.isDebugEnabled()) log.debug("Unable to close datagram channel.",iox); + } + datagramChannel=null; + } closeSelector(); } - + /** * Close Selector. Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java?rev=628881&r1=628880&r2=628881&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java Mon Feb 18 14:07:09 2008 @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,6 +18,9 @@ package org.apache.catalina.tribes.transport.nio; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; @@ -42,15 +45,15 @@ * serviceChannel() method stores the key reference in the thread object then * calls notify() to wake it up. When the channel has been drained, the worker * thread returns itself to its parent pool. - * + * * @author Filip Hanik - * + * * @version $Revision$, $Date$ */ public class NioReplicationTask extends AbstractRxTask { - + private static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog( NioReplicationTask.class ); - + private ByteBuffer buffer = null; private SelectionKey key; private int rxBufSize; @@ -62,7 +65,7 @@ } // loop forever waiting for work to do - public synchronized void run() { + public synchronized void run() { if ( buffer == null ) { if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER) { buffer = ByteBuffer.allocateDirect(getRxBufSize()); @@ -75,17 +78,17 @@ if (key == null) { return; // just in case } - if ( log.isTraceEnabled() ) + if ( log.isTraceEnabled() ) log.trace("Servicing key:"+key); try { ObjectReader reader = (ObjectReader)key.attachment(); if ( reader == null ) { - if ( log.isTraceEnabled() ) + if ( log.isTraceEnabled() ) log.trace("No object reader, cancelling:"+key); cancelKey(key); } else { - if ( log.isTraceEnabled() ) + if ( log.isTraceEnabled() ) log.trace("Draining channel:"+key); drainChannel(key, reader); @@ -102,7 +105,7 @@ } else if ( log.isErrorEnabled() ) { //this is a real error, log it. log.error("Exception caught in TcpReplicationThread.drainChannel.",e); - } + } cancelKey(key); } finally { @@ -143,16 +146,16 @@ protected void drainChannel (final SelectionKey key, ObjectReader reader) throws Exception { reader.setLastAccess(System.currentTimeMillis()); reader.access(); - SocketChannel channel = (SocketChannel) key.channel(); + ReadableByteChannel channel = (ReadableByteChannel) key.channel(); int count; buffer.clear(); // make buffer empty // loop while data available, channel is non-blocking while ((count = channel.read (buffer)) > 0) { buffer.flip(); // make buffer readable - if ( buffer.hasArray() ) + if ( buffer.hasArray() ) reader.append(buffer.array(),0,count,false); - else + else reader.append(buffer,count,false); buffer.clear(); // make buffer empty //do we have at least one package? @@ -160,24 +163,24 @@ } int pkgcnt = reader.count(); - + if (count < 0 && pkgcnt == 0 ) { //end of stream, and no more packages to process remoteEof(key); return; } - + ChannelMessage[] msgs = pkgcnt == 0? ChannelData.EMPTY_DATA_ARRAY : reader.execute(); - + registerForRead(key,reader);//register to read new data, before we send it off to avoid dead locks - + for ( int i=0; i<msgs.length; i++ ) { /** - * Use send ack here if you want to ack the request to the remote + * Use send ack here if you want to ack the request to the remote * server before completing the request * This is considered an asynchronized request */ - if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND); + if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(key,(WritableByteChannel)channel,Constants.ACK_COMMAND); try { if ( Logs.MESSAGES.isTraceEnabled() ) { try { @@ -187,24 +190,24 @@ //process the message getCallback().messageDataReceived(msgs[i]); /** - * Use send ack here if you want the request to complete on this + * Use send ack here if you want the request to complete on this * server before sending the ack to the remote server * This is considered a synchronized request */ - if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND); + if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,(WritableByteChannel)channel,Constants.ACK_COMMAND); }catch ( RemoteProcessException e ) { if ( log.isDebugEnabled() ) log.error("Processing of cluster message failed.",e); - if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND); + if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,(WritableByteChannel)channel,Constants.FAIL_ACK_COMMAND); }catch ( Exception e ) { log.error("Processing of cluster message failed.",e); - if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND); + if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,(WritableByteChannel)channel,Constants.FAIL_ACK_COMMAND); } if ( getUseBufferPool() ) { BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage()); msgs[i].setMessage(null); } - } - + } + if (count < 0) { remoteEof(key); return; @@ -218,7 +221,7 @@ } protected void registerForRead(final SelectionKey key, ObjectReader reader) { - if ( log.isTraceEnabled() ) + if ( log.isTraceEnabled() ) log.trace("Adding key for read event:"+key); reader.finish(); //register our OP_READ interest @@ -231,12 +234,12 @@ // resume interest in OP_READ, OP_WRITE int resumeOps = key.interestOps() | SelectionKey.OP_READ; key.interestOps(resumeOps); - if ( log.isTraceEnabled() ) + if ( log.isTraceEnabled() ) log.trace("Registering key for read:"+key); } } catch (CancelledKeyException ckx ) { NioReceiver.cancelledKey(key); - if ( log.isTraceEnabled() ) + if ( log.isTraceEnabled() ) log.trace("CKX Cancelling key:"+key); } catch (Exception x) { @@ -248,7 +251,7 @@ } private void cancelKey(final SelectionKey key) { - if ( log.isTraceEnabled() ) + if ( log.isTraceEnabled() ) log.trace("Adding key for cancel event:"+key); ObjectReader reader = (ObjectReader)key.attachment(); @@ -258,7 +261,7 @@ } Runnable cx = new Runnable() { public void run() { - if ( log.isTraceEnabled() ) + if ( log.isTraceEnabled() ) log.trace("Cancelling key:"+key); NioReceiver.cancelledKey(key); @@ -266,8 +269,8 @@ }; receiver.addEvent(cx); } - - + + @@ -276,8 +279,8 @@ * @param key * @param channel */ - protected void sendAck(SelectionKey key, SocketChannel channel, byte[] command) { - + protected void sendAck(SelectionKey key, WritableByteChannel channel, byte[] command) { + try { ByteBuffer buf = ByteBuffer.wrap(command); int total = 0; @@ -285,7 +288,10 @@ total += channel.write(buf); } if (log.isTraceEnabled()) { - log.trace("ACK sent to " + channel.socket().getPort()); + log.trace("ACK sent to " + + ( (channel instanceof SocketChannel) ? + ((SocketChannel)channel).socket().getInetAddress() : + ((DatagramChannel)channel).socket().getInetAddress())); } } catch ( java.io.IOException x ) { log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage()); Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java?rev=628881&r1=628880&r2=628881&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java Mon Feb 18 14:07:09 2008 @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,6 +20,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; @@ -34,7 +35,7 @@ /** * This class is NOT thread safe and should never be used with more than one thread at a time - * + * * This is a state machine, handled by the process method * States are: * - NOT_CONNECTED -> connect() -> CONNECTED @@ -42,7 +43,7 @@ * - READY_TO_WRITE -> write() -> READY TO WRITE | READY TO READ * - READY_TO_READ -> read() -> READY_TO_READ | TRANSFER_COMPLETE * - TRANSFER_COMPLETE -> CONNECTED - * + * * @author Filip Hanik * @version 1.0 */ @@ -50,10 +51,11 @@ protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(NioSender.class); - - - protected Selector selector; - protected SocketChannel socketChannel; + + + protected Selector selector; + protected SocketChannel socketChannel = null; + protected DatagramChannel dataChannel = null; /* * STATE VARIABLES * @@ -64,14 +66,14 @@ protected XByteBuffer ackbuf = new XByteBuffer(128,true); protected int remaining = 0; protected boolean complete; - + protected boolean connecting = false; - + public NioSender() { super(); - + } - + /** * State machine to send data * @param key SelectionKey @@ -89,7 +91,7 @@ completeConnect(); if ( current != null ) key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); return false; - } else { + } else { //wait for the connection to finish key.interestOps(key.interestOps() | SelectionKey.OP_CONNECT); return false; @@ -146,8 +148,8 @@ socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime()); socketChannel.socket().setTrafficClass(getSoTrafficClass()); } - - + + protected boolean read(SelectionKey key) throws IOException { //if there is no message here, we are done @@ -171,7 +173,7 @@ } } - + protected boolean write(SelectionKey key) throws IOException { if ( (!isConnected()) || (this.socketChannel==null)) { throw new IOException("NioSender is not connected, this should not occur."); @@ -215,7 +217,7 @@ } else { writebuf.clear(); } - + InetSocketAddress addr = new InetSocketAddress(getAddress(),getPort()); if ( socketChannel != null ) throw new IOException("Socket channel has already been established. Connection might be in progress."); socketChannel = SocketChannel.open(); @@ -227,7 +229,7 @@ socketChannel.register(getSelector(), SelectionKey.OP_CONNECT, this); } } - + /** * disconnect @@ -257,7 +259,7 @@ } } - + public void reset() { if ( isConnected() && readbuf == null) { readbuf = getReadBuffer(); @@ -273,10 +275,10 @@ setConnectTime(-1); } - private ByteBuffer getReadBuffer() { + private ByteBuffer getReadBuffer() { return getBuffer(getRxBufSize()); } - + private ByteBuffer getWriteBuffer() { return getBuffer(getTxBufSize()); } @@ -284,7 +286,7 @@ private ByteBuffer getBuffer(int size) { return (getDirectBuffer()?ByteBuffer.allocateDirect(size):ByteBuffer.allocate(size)); } - + /** * sendMessage * @@ -312,9 +314,9 @@ if (isConnected()) { socketChannel.register(getSelector(), SelectionKey.OP_WRITE, this); } - } + } } - + public byte[] getMessage() { return current; } Modified: tomcat/trunk/test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java?rev=628881&r1=628880&r2=628881&view=diff ============================================================================== --- tomcat/trunk/test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java (original) +++ tomcat/trunk/test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java Mon Feb 18 14:07:09 2008 @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,6 +15,7 @@ */ package org.apache.catalina.tribes.test.channel; +import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.group.GroupChannel; import junit.framework.TestCase; import org.apache.catalina.tribes.transport.ReceiverBase; @@ -25,6 +26,7 @@ */ public class ChannelStartStop extends TestCase { GroupChannel channel = null; + int udpPort = 45543; protected void setUp() throws Exception { super.setUp(); channel = new GroupChannel(); @@ -34,7 +36,7 @@ super.tearDown(); try {channel.stop(channel.DEFAULT);}catch (Exception ignore){} } - + public void testDoubleFullStart() throws Exception { int count = 0; try { @@ -52,11 +54,11 @@ public void testScrap() throws Exception { System.out.println(channel.getChannelReceiver().getClass()); ((ReceiverBase)channel.getChannelReceiver()).setMaxThreads(1); - } + } public void testDoublePartialStart() throws Exception { - //try to double start the RX + //try to double start the RX int count = 0; try { channel.start(channel.SND_RX_SEQ); @@ -82,7 +84,7 @@ } catch ( Exception x){/*expected*/} assertEquals(count,1); channel.stop(channel.DEFAULT); - + count = 0; try { channel.start(channel.SND_RX_SEQ); @@ -107,7 +109,7 @@ assertEquals(count,1); channel.stop(channel.DEFAULT); } - + public void testFalseOption() throws Exception { int flag = 0xFFF0;//should get ignored by the underlying components int count = 0; @@ -121,6 +123,14 @@ } catch ( Exception x){/*expected*/} assertEquals(count,2); channel.stop(channel.DEFAULT); + } + + public void testUdpReceiverStart() throws Exception { + ReceiverBase rb = (ReceiverBase)channel.getChannelReceiver(); + rb.setUdpPort(udpPort); + channel.start(Channel.DEFAULT); + Thread.sleep(1000); + channel.stop(Channel.DEFAULT); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]