Author: fhanik
Date: Mon Feb 18 14:07:09 2008
New Revision: 628881
URL: http://svn.apache.org/viewvc?rev=628881&view=rev
Log:
Starting to add in UDP support, still need to rethink how the sender is going
to work
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/Channel.java
tomcat/trunk/java/org/apache/catalina/tribes/io/ObjectReader.java
tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java
Modified: tomcat/trunk/java/org/apache/catalina/tribes/Channel.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/Channel.java?rev=628881&r1=628880&r2=628881&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/Channel.java (original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/Channel.java Mon Feb 18
14:07:09 2008
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -56,20 +56,20 @@
* |
* Coordinator (implements
MessageListener,MembershipListener,ChannelInterceptor)
* --------------------
- * / | \
+ * / | \
* / | \
* / | \
* / | \
* / | \
* MembershipService ChannelSender ChannelReceiver
[IO layer]
* </code></pre>
- *
+ *
* For example usage @see org.apache.catalina.tribes.group.GroupChannel
* @author Filip Hanik
* @version $Revision$, $Date$
*/
public interface Channel {
-
+
/**
* Start and stop sequences can be controlled by these constants
* This allows you to start separate components of the channel <br>
@@ -119,7 +119,7 @@
* @see #stop(int)
*/
public static final int MBR_TX_SEQ = 8;
-
+
/**
* Send options, when a message is sent, it can have an option flag
* to trigger certain behavior. Most flags are used to trigger channel
interceptors
@@ -127,7 +127,7 @@
* However, there are five default flags that every channel implementation
must implement<br>
* SEND_OPTIONS_BYTE_MESSAGE - The message is a pure byte message and no
marshalling or unmarshalling will
* be performed.<br>
- *
+ *
* @see #send(Member[], Serializable , int)
* @see #send(Member[], Serializable, int, ErrorHandler)
*/
@@ -150,27 +150,27 @@
* to trigger certain behavior. Most flags are used to trigger channel
interceptors
* as the message passes through the channel stack. <br>
* However, there are five default flags that every channel implementation
must implement<br>
- * SEND_OPTIONS_SYNCHRONIZED_ACK - Message is sent and an ACK is received
when the message has been received and
+ * SEND_OPTIONS_SYNCHRONIZED_ACK - Message is sent and an ACK is received
when the message has been received and
* processed by the recipient<br>
* If no ack is received, the message is not considered successful<br>
* @see #send(Member[], Serializable , int)
* @see #send(Member[], Serializable, int, ErrorHandler)
*/
public static final int SEND_OPTIONS_SYNCHRONIZED_ACK = 0x0004;
-
+
/**
* Send options, when a message is sent, it can have an option flag
* to trigger certain behavior. Most flags are used to trigger channel
interceptors
* as the message passes through the channel stack. <br>
* However, there are five default flags that every channel implementation
must implement<br>
- * SEND_OPTIONS_ASYNCHRONOUS - Message is sent and an ACK is received when
the message has been received and
+ * SEND_OPTIONS_ASYNCHRONOUS - Message is sent and an ACK is received when
the message has been received and
* processed by the recipient<br>
* If no ack is received, the message is not considered successful<br>
* @see #send(Member[], Serializable , int)
* @see #send(Member[], Serializable, int, ErrorHandler)
*/
public static final int SEND_OPTIONS_ASYNCHRONOUS = 0x0008;
-
+
/**
* Send options, when a message is sent, it can have an option flag
* to trigger certain behavior. Most flags are used to trigger channel
interceptors
@@ -181,7 +181,14 @@
* @see #send(Member[], Serializable, int, ErrorHandler)
*/
public static final int SEND_OPTIONS_SECURE = 0x0010;
-
+
+ /**
+ * Send options. When a message is sent with this flag on
+ * the system sends the message using UDP instead of TCP
+ * @see #send(Member[], Serializable , int)
+ * @see #send(Member[], Serializable, int, ErrorHandler)
+ */
+ public static final int SEND_OPTIONS_UDP = 0x0020;
/**
* Send options, when a message is sent, it can have an option flag
@@ -196,13 +203,13 @@
*/
public static final int SEND_OPTIONS_DEFAULT = SEND_OPTIONS_USE_ACK;
-
+
/**
* Adds an interceptor to the channel message chain.
* @param interceptor ChannelInterceptor
*/
public void addInterceptor(ChannelInterceptor interceptor);
-
+
/**
* Starts up the channel. This can be called multiple times for individual
services to start
* The svc parameter can be the logical or value of any constants
@@ -212,7 +219,7 @@
* MBR_TX_SEQ - starts the membership broadcaster <BR>
* SND_TX_SEQ - starts the replication transmitter<BR>
* SND_RX_SEQ - starts the replication receiver<BR>
- * <b>Note:</b> In order for the membership broadcaster to
+ * <b>Note:</b> In order for the membership broadcaster to
* transmit the correct information, it has to be started after the
replication receiver.
* @throws ChannelException if a startup error occurs or the service is
already started or an error occurs.
*/
@@ -229,14 +236,14 @@
* SND_RX_SEQ - stops the replication receiver<BR>
* @throws ChannelException if a startup error occurs or the service is
already stopped or an error occurs.
*/
- public void stop(int svc) throws ChannelException;
-
+ public void stop(int svc) throws ChannelException;
+
/**
* Send a message to one or more members in the cluster
* @param destination Member[] - the destinations, can not be null or zero
length, the reason for that
* is that a membership change can occur and at that time the application
is uncertain what group the message
* actually got sent to.
- * @param msg Serializable - the message to send, has to be serializable,
or a <code>ByteMessage</code> to
+ * @param msg Serializable - the message to send, has to be serializable,
or a <code>ByteMessage</code> to
* send a pure byte array
* @param options int - sender options, see class documentation for each
interceptor that is configured in order to trigger interceptors
* @return a unique Id that identifies the message that is sent
@@ -257,10 +264,10 @@
* @exception ChannelException - if a serialization error happens.
*/
public UniqueId send(Member[] destination, Serializable msg, int options,
ErrorHandler handler) throws ChannelException;
-
+
/**
* Sends a heart beat through the interceptor stacks
- * Use this method to alert interceptors and other components to
+ * Use this method to alert interceptors and other components to
* clean up garbage, timed out messages etc.<br>
* If you application has a background thread, then you can save one
thread,
* by configuring your channel to not use an internal heartbeat thread
@@ -268,14 +275,14 @@
* @see #setHeartbeat(boolean)
*/
public void heartbeat();
-
+
/**
* Enables or disables internal heartbeat.
* @param enable boolean - default value is implementation specific
* @see #heartbeat()
*/
public void setHeartbeat(boolean enable);
-
+
/**
* Add a membership listener, will get notified when a new member joins,
leaves or crashes
* <br>If the membership listener implements the Heartbeat interface
@@ -284,7 +291,7 @@
* @see MembershipListener
*/
public void addMembershipListener(MembershipListener listener);
-
+
/**
* Add a channel listener, this is a callback object when messages are
received
* <br>If the channel listener implements the Heartbeat interface
@@ -307,7 +314,7 @@
* @see ChannelListener
*/
public void removeChannelListener(ChannelListener listener);
-
+
/**
* Returns true if there are any members in the group,
* this call is the same as <code>getMembers().length>0</code>
@@ -317,7 +324,7 @@
/**
* Get all current group members
- * @return all members or empty array, never null
+ * @return all members or empty array, never null
*/
public Member[] getMembers() ;
@@ -329,10 +336,10 @@
* @return Member
*/
public Member getLocalMember(boolean incAlive);
-
+
/**
- * Returns the member from the membership service with complete and
- * recent data. Some implementations might serialize and send
+ * Returns the member from the membership service with complete and
+ * recent data. Some implementations might serialize and send
* membership information along with a message, and instead of sending
* complete membership details, only send the primary identifier for the
member
* but not the payload or other information. When such message is received
@@ -343,5 +350,5 @@
*/
public Member getMember(Member mbr);
-
+
}
Modified: tomcat/trunk/java/org/apache/catalina/tribes/io/ObjectReader.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/io/ObjectReader.java?rev=628881&r1=628880&r2=628881&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/io/ObjectReader.java (original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/io/ObjectReader.java Mon Feb
18 14:07:09 2008
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -40,12 +40,15 @@
protected static org.apache.juli.logging.Log log =
org.apache.juli.logging.LogFactory.getLog(ObjectReader.class);
private XByteBuffer buffer;
-
+
protected long lastAccess = System.currentTimeMillis();
-
+
protected boolean accessed = false;
private boolean cancelled;
+ public ObjectReader(int packetSize) {
+ this.buffer = new XByteBuffer(packetSize, true);
+ }
/**
* Creates an <code>ObjectReader</code> for a TCP NIO socket channel
* @param channel - the channel to be read.
@@ -53,7 +56,7 @@
public ObjectReader(SocketChannel channel) {
this(channel.socket());
}
-
+
/**
* Creates an <code>ObjectReader</code> for a TCP socket
* @param socket Socket
@@ -67,23 +70,23 @@
this.buffer = new XByteBuffer(43800,true);
}
}
-
+
public synchronized void access() {
this.accessed = true;
this.lastAccess = System.currentTimeMillis();
}
-
+
public synchronized void finish() {
this.accessed = false;
this.lastAccess = System.currentTimeMillis();
}
-
+
public boolean isAccessed() {
return this.accessed;
}
/**
- * Append new bytes to buffer.
+ * Append new bytes to buffer.
* @see XByteBuffer#countPackages()
* @param data new transfer buffer
* @param off offset
@@ -125,11 +128,11 @@
}
return result;
}
-
+
public int bufferSize() {
return buffer.getLength();
}
-
+
public boolean hasPackage() {
return buffer.countPackages(true)>0;
@@ -141,7 +144,7 @@
public int count() {
return buffer.countPackages();
}
-
+
public void close() {
this.buffer = null;
}
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java?rev=628881&r1=628880&r2=628881&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java
Mon Feb 18 14:07:09 2008
@@ -34,7 +34,7 @@
* @version 1.0
*/
public abstract class AbstractSender implements DataSender {
-
+
private boolean connected = false;
private int rxBufSize = 25188;
private int txBufSize = 43800;
@@ -57,7 +57,9 @@
private int soLingerTime = 3;
private int soTrafficClass = 0x04 | 0x08 | 0x010;
private boolean throwOnFailedAck = true;
-
+ private boolean udpBased = false;
+ private int udpPort = -1;
+
/**
* transfers sender properties from one sender to another
* @param from AbstractSender
@@ -82,13 +84,15 @@
to.soLingerTime = from.soLingerTime;
to.soTrafficClass = from.soTrafficClass;
to.throwOnFailedAck = from.throwOnFailedAck;
- }
+ to.udpBased = from.udpBased;
+ to.udpPort = from.udpPort;
+ }
+
-
public AbstractSender() {
-
+
}
-
+
/**
* connect
*
@@ -117,11 +121,11 @@
if ( disconnect ) disconnect();
return disconnect;
}
-
+
protected void setConnected(boolean connected){
this.connected = connected;
}
-
+
public boolean isConnected() {
return connected;
}
@@ -170,7 +174,7 @@
public int getMaxRetryAttempts() {
return maxRetryAttempts;
}
-
+
public void setDirect(boolean direct) {
setDirectBuffer(direct);
}
@@ -182,7 +186,7 @@
public boolean getDirect() {
return getDirectBuffer();
}
-
+
public boolean getDirectBuffer() {
return this.directBuffer;
}
@@ -304,6 +308,26 @@
public void setAddress(InetAddress address) {
this.address = address;
+ }
+
+
+ public boolean isUdpBased() {
+ return udpBased;
+ }
+
+
+ public void setUdpBased(boolean udpBased) {
+ this.udpBased = udpBased;
+ }
+
+
+ public int getUdpPort() {
+ return udpPort;
+ }
+
+
+ public void setUdpPort(int udpPort) {
+ this.udpPort = udpPort;
}
}
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java?rev=628881&r1=628880&r2=628881&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
Mon Feb 18 14:07:09 2008
@@ -18,7 +18,9 @@
package org.apache.catalina.tribes.transport.nio;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.net.ServerSocket;
+import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
@@ -59,6 +61,7 @@
private Selector selector = null;
private ServerSocketChannel serverChannel = null;
+ private DatagramChannel datagramChannel = null;
protected LinkedList events = new LinkedList();
// private Object interestOpsMutex = new Object();
@@ -110,7 +113,7 @@
else throw new IOException(x.getMessage());
}
}
-
+
public AbstractRxTask createRxTask() {
NioReplicationTask thread = new NioReplicationTask(this,this);
thread.setUseBufferPool(this.getUseBufferPool());
@@ -118,9 +121,9 @@
thread.setOptions(getWorkerThreadOptions());
return thread;
}
-
-
-
+
+
+
protected void bind() throws IOException {
// allocate an unbound server socket channel
serverChannel = ServerSocketChannel.open();
@@ -135,9 +138,22 @@
serverChannel.configureBlocking(false);
// register the ServerSocketChannel with the Selector
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
-
+
+ //set up the datagram channel
+ if (this.getUdpPort()>0) {
+ datagramChannel = DatagramChannel.open();
+ datagramChannel.configureBlocking(false);
+ //bind to the address to avoid security checks
+ InetSocketAddress daddr = new
InetSocketAddress(getBind(),getUdpPort());
+ //TODO should we auto increment the UDP port to avoid collisions?
+ //we could auto increment with the offset from the tcp listen port
+ datagramChannel.connect(daddr);
+ }
+
+
+
}
-
+
public void addEvent(Runnable event) {
if ( selector != null ) {
synchronized (events) {
@@ -163,18 +179,18 @@
events.clear();
}
}
-
+
public static void cancelledKey(SelectionKey key) {
ObjectReader reader = (ObjectReader)key.attachment();
if ( reader != null ) {
reader.setCancelled(true);
reader.finish();
}
- key.cancel();
+ key.cancel();
key.attach(null);
try { ((SocketChannel)key.channel()).socket().close(); } catch
(IOException e) { if (log.isDebugEnabled()) log.debug("", e); }
try { key.channel().close(); } catch (IOException e) { if
(log.isDebugEnabled()) log.debug("", e); }
-
+
}
protected long lastCheck = System.currentTimeMillis();
protected void socketTimeouts() {
@@ -202,7 +218,7 @@
if ( ka != null ) {
long delta = now - ka.getLastAccess();
if (delta > (long) getTimeout() && (!ka.isAccessed()))
{
- if (log.isWarnEnabled())
+ if (log.isWarnEnabled())
log.warn("Channel key is registered, but has
had no interest ops for the last "+getTimeout()+" ms.
(cancelled:"+ka.isCancelled()+"):"+key+" last access:"+new
java.sql.Timestamp(ka.getLastAccess())+" Possible cause: all threads used,
perform thread dump");
ka.setLastAccess(now);
//key.interestOps(SelectionKey.OP_READ);
@@ -230,8 +246,12 @@
log.warn("ServerSocketChannel already started");
return;
}
-
+
setListen(true);
+ if (selector!=null && datagramChannel!=null) {
+ ObjectReader oreader = new ObjectReader(1024*65);
+
registerChannel(selector,datagramChannel,SelectionKey.OP_READ,oreader);
+ }
while (doListen() && selector != null) {
// this may block for a long time, upon return the
@@ -302,10 +322,18 @@
}
serverChannel.close();
+ if (datagramChannel!=null) {
+ try {
+ datagramChannel.close();
+ }catch (Exception iox) {
+ if (log.isDebugEnabled()) log.debug("Unable to close datagram
channel.",iox);
+ }
+ datagramChannel=null;
+ }
closeSelector();
}
-
+
/**
* Close Selector.
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java?rev=628881&r1=628880&r2=628881&view=diff
==============================================================================
---
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
(original)
+++
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
Mon Feb 18 14:07:09 2008
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,6 +18,9 @@
package org.apache.catalina.tribes.transport.nio;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
@@ -42,15 +45,15 @@
* serviceChannel() method stores the key reference in the thread object then
* calls notify() to wake it up. When the channel has been drained, the worker
* thread returns itself to its parent pool.
- *
+ *
* @author Filip Hanik
- *
+ *
* @version $Revision$, $Date$
*/
public class NioReplicationTask extends AbstractRxTask {
-
+
private static org.apache.juli.logging.Log log =
org.apache.juli.logging.LogFactory.getLog( NioReplicationTask.class );
-
+
private ByteBuffer buffer = null;
private SelectionKey key;
private int rxBufSize;
@@ -62,7 +65,7 @@
}
// loop forever waiting for work to do
- public synchronized void run() {
+ public synchronized void run() {
if ( buffer == null ) {
if ( (getOptions() & OPTION_DIRECT_BUFFER) ==
OPTION_DIRECT_BUFFER) {
buffer = ByteBuffer.allocateDirect(getRxBufSize());
@@ -75,17 +78,17 @@
if (key == null) {
return; // just in case
}
- if ( log.isTraceEnabled() )
+ if ( log.isTraceEnabled() )
log.trace("Servicing key:"+key);
try {
ObjectReader reader = (ObjectReader)key.attachment();
if ( reader == null ) {
- if ( log.isTraceEnabled() )
+ if ( log.isTraceEnabled() )
log.trace("No object reader, cancelling:"+key);
cancelKey(key);
} else {
- if ( log.isTraceEnabled() )
+ if ( log.isTraceEnabled() )
log.trace("Draining channel:"+key);
drainChannel(key, reader);
@@ -102,7 +105,7 @@
} else if ( log.isErrorEnabled() ) {
//this is a real error, log it.
log.error("Exception caught in
TcpReplicationThread.drainChannel.",e);
- }
+ }
cancelKey(key);
} finally {
@@ -143,16 +146,16 @@
protected void drainChannel (final SelectionKey key, ObjectReader reader)
throws Exception {
reader.setLastAccess(System.currentTimeMillis());
reader.access();
- SocketChannel channel = (SocketChannel) key.channel();
+ ReadableByteChannel channel = (ReadableByteChannel) key.channel();
int count;
buffer.clear(); // make buffer empty
// loop while data available, channel is non-blocking
while ((count = channel.read (buffer)) > 0) {
buffer.flip(); // make buffer readable
- if ( buffer.hasArray() )
+ if ( buffer.hasArray() )
reader.append(buffer.array(),0,count,false);
- else
+ else
reader.append(buffer,count,false);
buffer.clear(); // make buffer empty
//do we have at least one package?
@@ -160,24 +163,24 @@
}
int pkgcnt = reader.count();
-
+
if (count < 0 && pkgcnt == 0 ) {
//end of stream, and no more packages to process
remoteEof(key);
return;
}
-
+
ChannelMessage[] msgs = pkgcnt == 0? ChannelData.EMPTY_DATA_ARRAY :
reader.execute();
-
+
registerForRead(key,reader);//register to read new data, before we
send it off to avoid dead locks
-
+
for ( int i=0; i<msgs.length; i++ ) {
/**
- * Use send ack here if you want to ack the request to the remote
+ * Use send ack here if you want to ack the request to the remote
* server before completing the request
* This is considered an asynchronized request
*/
- if (ChannelData.sendAckAsync(msgs[i].getOptions()))
sendAck(key,channel,Constants.ACK_COMMAND);
+ if (ChannelData.sendAckAsync(msgs[i].getOptions()))
sendAck(key,(WritableByteChannel)channel,Constants.ACK_COMMAND);
try {
if ( Logs.MESSAGES.isTraceEnabled() ) {
try {
@@ -187,24 +190,24 @@
//process the message
getCallback().messageDataReceived(msgs[i]);
/**
- * Use send ack here if you want the request to complete on
this
+ * Use send ack here if you want the request to complete on
this
* server before sending the ack to the remote server
* This is considered a synchronized request
*/
- if (ChannelData.sendAckSync(msgs[i].getOptions()))
sendAck(key,channel,Constants.ACK_COMMAND);
+ if (ChannelData.sendAckSync(msgs[i].getOptions()))
sendAck(key,(WritableByteChannel)channel,Constants.ACK_COMMAND);
}catch ( RemoteProcessException e ) {
if ( log.isDebugEnabled() ) log.error("Processing of cluster
message failed.",e);
- if (ChannelData.sendAckSync(msgs[i].getOptions()))
sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
+ if (ChannelData.sendAckSync(msgs[i].getOptions()))
sendAck(key,(WritableByteChannel)channel,Constants.FAIL_ACK_COMMAND);
}catch ( Exception e ) {
log.error("Processing of cluster message failed.",e);
- if (ChannelData.sendAckSync(msgs[i].getOptions()))
sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
+ if (ChannelData.sendAckSync(msgs[i].getOptions()))
sendAck(key,(WritableByteChannel)channel,Constants.FAIL_ACK_COMMAND);
}
if ( getUseBufferPool() ) {
BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
msgs[i].setMessage(null);
}
- }
-
+ }
+
if (count < 0) {
remoteEof(key);
return;
@@ -218,7 +221,7 @@
}
protected void registerForRead(final SelectionKey key, ObjectReader
reader) {
- if ( log.isTraceEnabled() )
+ if ( log.isTraceEnabled() )
log.trace("Adding key for read event:"+key);
reader.finish();
//register our OP_READ interest
@@ -231,12 +234,12 @@
// resume interest in OP_READ, OP_WRITE
int resumeOps = key.interestOps() |
SelectionKey.OP_READ;
key.interestOps(resumeOps);
- if ( log.isTraceEnabled() )
+ if ( log.isTraceEnabled() )
log.trace("Registering key for read:"+key);
}
} catch (CancelledKeyException ckx ) {
NioReceiver.cancelledKey(key);
- if ( log.isTraceEnabled() )
+ if ( log.isTraceEnabled() )
log.trace("CKX Cancelling key:"+key);
} catch (Exception x) {
@@ -248,7 +251,7 @@
}
private void cancelKey(final SelectionKey key) {
- if ( log.isTraceEnabled() )
+ if ( log.isTraceEnabled() )
log.trace("Adding key for cancel event:"+key);
ObjectReader reader = (ObjectReader)key.attachment();
@@ -258,7 +261,7 @@
}
Runnable cx = new Runnable() {
public void run() {
- if ( log.isTraceEnabled() )
+ if ( log.isTraceEnabled() )
log.trace("Cancelling key:"+key);
NioReceiver.cancelledKey(key);
@@ -266,8 +269,8 @@
};
receiver.addEvent(cx);
}
-
-
+
+
@@ -276,8 +279,8 @@
* @param key
* @param channel
*/
- protected void sendAck(SelectionKey key, SocketChannel channel, byte[]
command) {
-
+ protected void sendAck(SelectionKey key, WritableByteChannel channel,
byte[] command) {
+
try {
ByteBuffer buf = ByteBuffer.wrap(command);
int total = 0;
@@ -285,7 +288,10 @@
total += channel.write(buf);
}
if (log.isTraceEnabled()) {
- log.trace("ACK sent to " + channel.socket().getPort());
+ log.trace("ACK sent to " +
+ ( (channel instanceof SocketChannel) ?
+ ((SocketChannel)channel).socket().getInetAddress() :
+
((DatagramChannel)channel).socket().getInetAddress()));
}
} catch ( java.io.IOException x ) {
log.warn("Unable to send ACK back through channel, channel
disconnected?: "+x.getMessage());
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java?rev=628881&r1=628880&r2=628881&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
Mon Feb 18 14:07:09 2008
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
@@ -34,7 +35,7 @@
/**
* This class is NOT thread safe and should never be used with more than one
thread at a time
- *
+ *
* This is a state machine, handled by the process method
* States are:
* - NOT_CONNECTED -> connect() -> CONNECTED
@@ -42,7 +43,7 @@
* - READY_TO_WRITE -> write() -> READY TO WRITE | READY TO READ
* - READY_TO_READ -> read() -> READY_TO_READ | TRANSFER_COMPLETE
* - TRANSFER_COMPLETE -> CONNECTED
- *
+ *
* @author Filip Hanik
* @version 1.0
*/
@@ -50,10 +51,11 @@
protected static org.apache.juli.logging.Log log =
org.apache.juli.logging.LogFactory.getLog(NioSender.class);
-
-
- protected Selector selector;
- protected SocketChannel socketChannel;
+
+
+ protected Selector selector;
+ protected SocketChannel socketChannel = null;
+ protected DatagramChannel dataChannel = null;
/*
* STATE VARIABLES *
@@ -64,14 +66,14 @@
protected XByteBuffer ackbuf = new XByteBuffer(128,true);
protected int remaining = 0;
protected boolean complete;
-
+
protected boolean connecting = false;
-
+
public NioSender() {
super();
-
+
}
-
+
/**
* State machine to send data
* @param key SelectionKey
@@ -89,7 +91,7 @@
completeConnect();
if ( current != null ) key.interestOps(key.interestOps() |
SelectionKey.OP_WRITE);
return false;
- } else {
+ } else {
//wait for the connection to finish
key.interestOps(key.interestOps() | SelectionKey.OP_CONNECT);
return false;
@@ -146,8 +148,8 @@
socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime());
socketChannel.socket().setTrafficClass(getSoTrafficClass());
}
-
-
+
+
protected boolean read(SelectionKey key) throws IOException {
//if there is no message here, we are done
@@ -171,7 +173,7 @@
}
}
-
+
protected boolean write(SelectionKey key) throws IOException {
if ( (!isConnected()) || (this.socketChannel==null)) {
throw new IOException("NioSender is not connected, this should not
occur.");
@@ -215,7 +217,7 @@
} else {
writebuf.clear();
}
-
+
InetSocketAddress addr = new InetSocketAddress(getAddress(),getPort());
if ( socketChannel != null ) throw new IOException("Socket channel has
already been established. Connection might be in progress.");
socketChannel = SocketChannel.open();
@@ -227,7 +229,7 @@
socketChannel.register(getSelector(), SelectionKey.OP_CONNECT,
this);
}
}
-
+
/**
* disconnect
@@ -257,7 +259,7 @@
}
}
-
+
public void reset() {
if ( isConnected() && readbuf == null) {
readbuf = getReadBuffer();
@@ -273,10 +275,10 @@
setConnectTime(-1);
}
- private ByteBuffer getReadBuffer() {
+ private ByteBuffer getReadBuffer() {
return getBuffer(getRxBufSize());
}
-
+
private ByteBuffer getWriteBuffer() {
return getBuffer(getTxBufSize());
}
@@ -284,7 +286,7 @@
private ByteBuffer getBuffer(int size) {
return
(getDirectBuffer()?ByteBuffer.allocateDirect(size):ByteBuffer.allocate(size));
}
-
+
/**
* sendMessage
*
@@ -312,9 +314,9 @@
if (isConnected()) {
socketChannel.register(getSelector(), SelectionKey.OP_WRITE,
this);
}
- }
+ }
}
-
+
public byte[] getMessage() {
return current;
}
Modified:
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java?rev=628881&r1=628880&r2=628881&view=diff
==============================================================================
---
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java
(original)
+++
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java
Mon Feb 18 14:07:09 2008
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,6 +15,7 @@
*/
package org.apache.catalina.tribes.test.channel;
+import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.group.GroupChannel;
import junit.framework.TestCase;
import org.apache.catalina.tribes.transport.ReceiverBase;
@@ -25,6 +26,7 @@
*/
public class ChannelStartStop extends TestCase {
GroupChannel channel = null;
+ int udpPort = 45543;
protected void setUp() throws Exception {
super.setUp();
channel = new GroupChannel();
@@ -34,7 +36,7 @@
super.tearDown();
try {channel.stop(channel.DEFAULT);}catch (Exception ignore){}
}
-
+
public void testDoubleFullStart() throws Exception {
int count = 0;
try {
@@ -52,11 +54,11 @@
public void testScrap() throws Exception {
System.out.println(channel.getChannelReceiver().getClass());
((ReceiverBase)channel.getChannelReceiver()).setMaxThreads(1);
- }
+ }
public void testDoublePartialStart() throws Exception {
- //try to double start the RX
+ //try to double start the RX
int count = 0;
try {
channel.start(channel.SND_RX_SEQ);
@@ -82,7 +84,7 @@
} catch ( Exception x){/*expected*/}
assertEquals(count,1);
channel.stop(channel.DEFAULT);
-
+
count = 0;
try {
channel.start(channel.SND_RX_SEQ);
@@ -107,7 +109,7 @@
assertEquals(count,1);
channel.stop(channel.DEFAULT);
}
-
+
public void testFalseOption() throws Exception {
int flag = 0xFFF0;//should get ignored by the underlying components
int count = 0;
@@ -121,6 +123,14 @@
} catch ( Exception x){/*expected*/}
assertEquals(count,2);
channel.stop(channel.DEFAULT);
+ }
+
+ public void testUdpReceiverStart() throws Exception {
+ ReceiverBase rb = (ReceiverBase)channel.getChannelReceiver();
+ rb.setUdpPort(udpPort);
+ channel.start(Channel.DEFAULT);
+ Thread.sleep(1000);
+ channel.stop(Channel.DEFAULT);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]