Author: fhanik
Date: Mon Feb 18 16:57:54 2008
New Revision: 628940
URL: http://svn.apache.org/viewvc?rev=628940&view=rev
Log:
more UDP code
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/Member.java
tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java
tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
Modified: tomcat/trunk/java/org/apache/catalina/tribes/Member.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/Member.java?rev=628940&r1=628939&r2=628940&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/Member.java (original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/Member.java Mon Feb 18
16:57:54 2008
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -24,7 +24,7 @@
* The host is what interface the member is listening to, to receive data<br>
* The port is what port the member is listening to, to receive data<br>
* The uniqueId defines the session id for the member. This is an important
feature
- * since a member that has crashed and the starts up again on the same
port/host is
+ * since a member that has crashed and the starts up again on the same
port/host is
* not guaranteed to be the same member, so no state transfers will ever be
confused
* @author Filip Hanik
* @version $Revision$, $Date$
@@ -32,18 +32,18 @@
public interface Member {
-
+
/**
* When a member leaves the cluster, the payload of the memberDisappeared
member
* will be the following bytes. This indicates a soft shutdown, and not a
crash
*/
public static final byte[] SHUTDOWN_PAYLOAD = new byte[] {66, 65, 66, 89,
45, 65, 76, 69, 88};
-
+
/**
* Returns the name of this node, should be unique within the group.
*/
public String getName();
-
+
/**
* Returns the listen host for the ChannelReceiver implementation
* @return IPv4 or IPv6 representation of the host address this member
listens to incoming data
@@ -57,7 +57,7 @@
* @see ChannelReceiver
*/
public int getPort();
-
+
/**
* Returns the secure listen port for the ChannelReceiver implementation.
* Returns -1 if its not listening to a secure port.
@@ -65,7 +65,13 @@
* @see ChannelReceiver
*/
public int getSecurePort();
-
+
+ /**
+ * Returns the UDP port that this member is listening to for UDP messages.
+ * @return the listen UDP port for this member, -1 if its not listening on
a UDP port
+ */
+ public int getUdpPort();
+
/**
* Contains information on how long this member has been online.
@@ -74,7 +80,7 @@
* @return nr of milliseconds since this member started.
*/
public long getMemberAliveTime();
-
+
/**
* The current state of the member
* @return boolean - true if the member is functioning correctly
@@ -85,32 +91,32 @@
* @return boolean - true if the member is suspect, but the crash has not
been confirmed
*/
public boolean isSuspect();
-
+
/**
- *
- * @return boolean - true if the member has been confirmed to malfunction
+ *
+ * @return boolean - true if the member has been confirmed to malfunction
*/
public boolean isFailing();
-
+
/**
* returns a UUID unique for this member over all sessions.
* If the member crashes and restarts, the uniqueId will be different.
* @return byte[]
*/
public byte[] getUniqueId();
-
+
/**
* returns the payload associated with this member
* @return byte[]
*/
public byte[] getPayload();
-
+
/**
* returns the command associated with this member
* @return byte[]
*/
public byte[] getCommand();
-
+
/**
* Domain for this cluster
* @return byte[]
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java?rev=628940&r1=628939&r2=628940&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java
Mon Feb 18 16:57:54 2008
@@ -116,7 +116,8 @@
*/
public boolean keepalive() {
boolean disconnect = false;
- if ( keepAliveCount >= 0 && requestCount>keepAliveCount ) disconnect =
true;
+ if (isUdpBased()) disconnect = true; //always disconnect UDP, TODO
optimize the keepalive handling
+ else if ( keepAliveCount >= 0 && requestCount>keepAliveCount )
disconnect = true;
else if ( keepAliveTime >= 0 &&
(System.currentTimeMillis()-connectTime)>keepAliveTime ) disconnect = true;
if ( disconnect ) disconnect();
return disconnect;
@@ -299,6 +300,7 @@
this.destination = destination;
this.address = InetAddress.getByAddress(destination.getHost());
this.port = destination.getPort();
+ this.udpPort = destination.getUdpPort();
}
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java?rev=628940&r1=628939&r2=628940&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java
Mon Feb 18 16:57:54 2008
@@ -17,6 +17,7 @@
package org.apache.catalina.tribes.transport;
import java.io.IOException;
+import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
@@ -220,6 +221,38 @@
}
return retries;
}
+
+ /**
+ * Same as bind() except it does it for the UDP port
+ * @param socket
+ * @param portstart
+ * @param retries
+ * @return
+ * @throws IOException
+ */
+ protected int bindUdp(DatagramSocket socket, int portstart, int retries)
throws IOException {
+ InetSocketAddress addr = null;
+ while ( retries > 0 ) {
+ try {
+ addr = new InetSocketAddress(getBind(), portstart);
+ socket.bind(addr);
+ setUdpPort(portstart);
+ log.info("UDP Receiver Server Socket bound to:"+addr);
+ return 0;
+ }catch ( IOException x) {
+ retries--;
+ if ( retries <= 0 ) {
+ log.info("Unable to bind UDP socket to:"+addr+" throwing
error.");
+ throw x;
+ }
+ portstart++;
+ try {Thread.sleep(25);}catch( InterruptedException
ti){Thread.currentThread().interrupted();}
+ retries = bindUdp(socket,portstart,retries);
+ }
+ }
+ return retries;
+ }
+
public void messageDataReceived(ChannelMessage data) {
if ( this.listener != null ) {
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java?rev=628940&r1=628939&r2=628940&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
Mon Feb 18 16:57:54 2008
@@ -144,10 +144,7 @@
datagramChannel = DatagramChannel.open();
datagramChannel.configureBlocking(false);
//bind to the address to avoid security checks
- InetSocketAddress daddr = new
InetSocketAddress(getBind(),getUdpPort());
- //TODO should we auto increment the UDP port to avoid collisions?
- //we could auto increment with the offset from the tcp listen port
- datagramChannel.connect(daddr);
+ bindUdp(datagramChannel.socket(),getUdpPort(),getAutoBind());
}
@@ -188,7 +185,10 @@
}
key.cancel();
key.attach(null);
- try { ((SocketChannel)key.channel()).socket().close(); } catch
(IOException e) { if (log.isDebugEnabled()) log.debug("", e); }
+ if (key.channel() instanceof SocketChannel)
+ try { ((SocketChannel)key.channel()).socket().close(); } catch
(IOException e) { if (log.isDebugEnabled()) log.debug("", e); }
+ if (key.channel() instanceof DatagramChannel)
+ try { ((DatagramChannel)key.channel()).socket().close(); } catch
(Exception e) { if (log.isDebugEnabled()) log.debug("", e); }
try { key.channel().close(); } catch (IOException e) { if
(log.isDebugEnabled()) log.debug("", e); }
}
@@ -249,7 +249,7 @@
setListen(true);
if (selector!=null && datagramChannel!=null) {
- ObjectReader oreader = new ObjectReader(1024*65);
+ ObjectReader oreader = new ObjectReader(65535); //max size for a
datagram packet
registerChannel(selector,datagramChannel,SelectionKey.OP_READ,oreader);
}
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java?rev=628940&r1=628939&r2=628940&view=diff
==============================================================================
---
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
(original)
+++
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
Mon Feb 18 16:57:54 2008
@@ -17,6 +17,7 @@
package org.apache.catalina.tribes.transport.nio;
import java.io.IOException;
+import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.ReadableByteChannel;
@@ -147,19 +148,33 @@
reader.setLastAccess(System.currentTimeMillis());
reader.access();
ReadableByteChannel channel = (ReadableByteChannel) key.channel();
- int count;
+ int count=-1;
buffer.clear(); // make buffer empty
+ SocketAddress saddr = null;
- // loop while data available, channel is non-blocking
- while ((count = channel.read (buffer)) > 0) {
- buffer.flip(); // make buffer readable
+ if (channel instanceof SocketChannel) {
+ // loop while data available, channel is non-blocking
+ while ((count = channel.read (buffer)) > 0) {
+ buffer.flip(); // make buffer readable
+ if ( buffer.hasArray() )
+ reader.append(buffer.array(),0,count,false);
+ else
+ reader.append(buffer,count,false);
+ buffer.clear(); // make buffer empty
+ //do we have at least one package?
+ if ( reader.hasPackage() ) break;
+ }
+ } else if (channel instanceof DatagramChannel) {
+ DatagramChannel dchannel = (DatagramChannel)channel;
+ saddr = dchannel.receive(buffer);
+ buffer.flip(); // make buffer readable
if ( buffer.hasArray() )
- reader.append(buffer.array(),0,count,false);
+
reader.append(buffer.array(),0,buffer.limit()-buffer.position(),false);
else
- reader.append(buffer,count,false);
- buffer.clear(); // make buffer empty
- //do we have at least one package?
- if ( reader.hasPackage() ) break;
+ reader.append(buffer,buffer.limit()-buffer.position(),false);
+ buffer.clear(); // make buffer empty
+ //did we get a package
+ count = reader.hasPackage()?1:-1;
}
int pkgcnt = reader.count();
@@ -180,7 +195,7 @@
* server before completing the request
* This is considered an asynchronized request
*/
- if (ChannelData.sendAckAsync(msgs[i].getOptions()))
sendAck(key,(WritableByteChannel)channel,Constants.ACK_COMMAND);
+ if (ChannelData.sendAckAsync(msgs[i].getOptions()))
sendAck(key,(WritableByteChannel)channel,Constants.ACK_COMMAND,saddr);
try {
if ( Logs.MESSAGES.isTraceEnabled() ) {
try {
@@ -194,13 +209,13 @@
* server before sending the ack to the remote server
* This is considered a synchronized request
*/
- if (ChannelData.sendAckSync(msgs[i].getOptions()))
sendAck(key,(WritableByteChannel)channel,Constants.ACK_COMMAND);
+ if (ChannelData.sendAckSync(msgs[i].getOptions()))
sendAck(key,(WritableByteChannel)channel,Constants.ACK_COMMAND,saddr);
}catch ( RemoteProcessException e ) {
if ( log.isDebugEnabled() ) log.error("Processing of cluster
message failed.",e);
- if (ChannelData.sendAckSync(msgs[i].getOptions()))
sendAck(key,(WritableByteChannel)channel,Constants.FAIL_ACK_COMMAND);
+ if (ChannelData.sendAckSync(msgs[i].getOptions()))
sendAck(key,(WritableByteChannel)channel,Constants.FAIL_ACK_COMMAND,saddr);
}catch ( Exception e ) {
log.error("Processing of cluster message failed.",e);
- if (ChannelData.sendAckSync(msgs[i].getOptions()))
sendAck(key,(WritableByteChannel)channel,Constants.FAIL_ACK_COMMAND);
+ if (ChannelData.sendAckSync(msgs[i].getOptions()))
sendAck(key,(WritableByteChannel)channel,Constants.FAIL_ACK_COMMAND,saddr);
}
if ( getUseBufferPool() ) {
BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
@@ -275,17 +290,25 @@
/**
- * send a reply-acknowledgement (6,2,3)
+ * send a reply-acknowledgement (6,2,3), sends it doing a busy write, the
ACK is so small
+ * that it should always go to the buffer
* @param key
* @param channel
*/
- protected void sendAck(SelectionKey key, WritableByteChannel channel,
byte[] command) {
-
+ protected void sendAck(SelectionKey key, WritableByteChannel channel,
byte[] command, SocketAddress udpaddr) {
try {
+
ByteBuffer buf = ByteBuffer.wrap(command);
int total = 0;
- while ( total < command.length ) {
- total += channel.write(buf);
+ if (channel instanceof DatagramChannel) {
+ DatagramChannel dchannel = (DatagramChannel)channel;
+ while ( total < command.length ) {
+ total += dchannel.send(buf, udpaddr);
+ }
+ } else {
+ while ( total < command.length ) {
+ total += channel.write(buf);
+ }
}
if (log.isTraceEnabled()) {
log.trace("ACK sent to " +
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java?rev=628940&r1=628939&r2=628940&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
Mon Feb 18 16:57:54 2008
@@ -154,7 +154,7 @@
protected boolean read(SelectionKey key) throws IOException {
//if there is no message here, we are done
if ( current == null ) return true;
- int read = socketChannel.read(readbuf);
+ int read = isUdpBased()?dataChannel.read(readbuf) :
socketChannel.read(readbuf);
//end of stream
if ( read == -1 ) throw new IOException("Unable to receive an ack
message. EOF on socket channel has been reached.");
//no data read
@@ -175,14 +175,14 @@
protected boolean write(SelectionKey key) throws IOException {
- if ( (!isConnected()) || (this.socketChannel==null)) {
+ if ( (!isConnected()) || (this.socketChannel==null &&
this.dataChannel==null)) {
throw new IOException("NioSender is not connected, this should not
occur.");
}
if ( current != null ) {
if ( remaining > 0 ) {
//weve written everything, or we are starting a new package
//protect against buffer overwrite
- int byteswritten = socketChannel.write(writebuf);
+ int byteswritten = isUdpBased()?dataChannel.write(writebuf) :
socketChannel.write(writebuf);
if (byteswritten == -1 ) throw new EOFException();
remaining -= byteswritten;
//if the entire message was written from the buffer
@@ -204,7 +204,7 @@
* @todo Implement this org.apache.catalina.tribes.transport.IDataSender
method
*/
public synchronized void connect() throws IOException {
- if ( connecting ) return;
+ if ( connecting || isConnected()) return;
connecting = true;
if ( isConnected() ) throw new IOException("NioSender is already in
connected state.");
if ( readbuf == null ) {
@@ -218,15 +218,23 @@
writebuf.clear();
}
- InetSocketAddress addr = new InetSocketAddress(getAddress(),getPort());
- if ( socketChannel != null ) throw new IOException("Socket channel has
already been established. Connection might be in progress.");
- socketChannel = SocketChannel.open();
- socketChannel.configureBlocking(false);
- if ( socketChannel.connect(addr) ) {
- completeConnect();
- socketChannel.register(getSelector(), SelectionKey.OP_WRITE, this);
+ if (isUdpBased()) {
+ InetSocketAddress daddr = new
InetSocketAddress(getAddress(),getUdpPort());
+ if ( dataChannel != null ) throw new IOException("Datagram channel
has already been established. Connection might be in progress.");
+ dataChannel = DatagramChannel.open();
+ dataChannel.configureBlocking(false);
+ dataChannel.connect(daddr);
} else {
- socketChannel.register(getSelector(), SelectionKey.OP_CONNECT,
this);
+ InetSocketAddress addr = new
InetSocketAddress(getAddress(),getPort());
+ if ( socketChannel != null ) throw new IOException("Socket channel
has already been established. Connection might be in progress.");
+ socketChannel = SocketChannel.open();
+ socketChannel.configureBlocking(false);
+ if ( socketChannel.connect(addr) ) {
+ completeConnect();
+ socketChannel.register(getSelector(), SelectionKey.OP_WRITE,
this);
+ } else {
+ socketChannel.register(getSelector(), SelectionKey.OP_CONNECT,
this);
+ }
}
}
@@ -252,6 +260,18 @@
socketChannel = null;
}
}
+ if ( dataChannel != null ) {
+ try {
+ try {dataChannel.socket().close();}catch ( Exception x){}
+ //error free close, all the way
+ //try {socket.shutdownOutput();}catch ( Exception x){}
+ //try {socket.shutdownInput();}catch ( Exception x){}
+ //try {socket.close();}catch ( Exception x){}
+ try {dataChannel.close();}catch ( Exception x){}
+ }finally {
+ dataChannel = null;
+ }
+ }
} catch ( Exception x ) {
log.error("Unable to disconnect NioSender. msg="+x.getMessage());
if ( log.isDebugEnabled() ) log.debug("Unable to disconnect
NioSender. msg="+x.getMessage(),x);
@@ -273,6 +293,7 @@
setAttempt(0);
setRequestCount(0);
setConnectTime(-1);
+ setUdpBased(false);
}
private ByteBuffer getReadBuffer() {
@@ -312,7 +333,10 @@
//writebuf.limit(length);
writebuf.flip();
if (isConnected()) {
- socketChannel.register(getSelector(), SelectionKey.OP_WRITE,
this);
+ if (isUdpBased())
+ dataChannel.register(getSelector(), SelectionKey.OP_WRITE,
this);
+ else
+ socketChannel.register(getSelector(),
SelectionKey.OP_WRITE, this);
}
}
}
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java?rev=628940&r1=628939&r2=628940&view=diff
==============================================================================
---
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
(original)
+++
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
Mon Feb 18 16:57:54 2008
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -48,7 +48,7 @@
* @version 1.0
*/
public class ParallelNioSender extends AbstractSender implements
MultiPointSender {
-
+
protected static org.apache.juli.logging.Log log =
org.apache.juli.logging.LogFactory.getLog(ParallelNioSender.class);
protected long selectTimeout = 5000; //default 5 seconds, same as send
timeout
protected Selector selector;
@@ -58,15 +58,16 @@
selector = Selector.open();
setConnected(true);
}
-
-
+
+
public synchronized void sendMessage(Member[] destination, ChannelMessage
msg) throws ChannelException {
long start = System.currentTimeMillis();
+ this.setUdpBased((msg.getOptions()&Channel.SEND_OPTIONS_UDP) ==
Channel.SEND_OPTIONS_UDP);
byte[] data = XByteBuffer.createDataPackage((ChannelData)msg);
NioSender[] senders = setupForSend(destination);
connect(senders);
setData(senders,data);
-
+
int remaining = senders.length;
ChannelException cx = null;
try {
@@ -108,17 +109,17 @@
if ( x instanceof ChannelException ) throw (ChannelException)x;
else throw new ChannelException(x);
}
-
+
}
-
+
private int doLoop(long selectTimeOut, int maxAttempts, boolean
waitForAck, ChannelMessage msg) throws IOException, ChannelException {
int completed = 0;
int selectedKeys = selector.select(selectTimeOut);
-
+
if (selectedKeys == 0) {
return 0;
}
-
+
Iterator it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey sk = (SelectionKey) it.next();
@@ -140,16 +141,16 @@
int attempt = sender.getAttempt()+1;
boolean retry = (sender.getAttempt() <= maxAttempts &&
maxAttempts>0);
synchronized (state) {
-
+
//sk.cancel();
if (state.isSuspect()) state.setFailing();
if (state.isReady()) {
state.setSuspect();
- if ( retry )
+ if ( retry )
log.warn("Member send is failing for:" +
sender.getDestination().getName() +" ; Setting to suspect and retrying.");
- else
+ else
log.warn("Member send is failing for:" +
sender.getDestination().getName() +" ; Setting to suspect.", x);
- }
+ }
}
if ( !isConnected() ) {
log.warn("Not retrying send for:" +
sender.getDestination().getName() + "; Sender is disconnected.");
@@ -157,11 +158,11 @@
cx.addFaultyMember(sender.getDestination(),x);
throw cx;
}
-
+
byte[] data = sender.getMessage();
if ( retry ) {
- try {
- sender.disconnect();
+ try {
+ sender.disconnect();
sender.connect();
sender.setAttempt(attempt);
sender.setMessage(data);
@@ -178,12 +179,12 @@
return completed;
}
-
+
private void connect(NioSender[] senders) throws ChannelException {
ChannelException x = null;
for (int i=0; i<senders.length; i++ ) {
try {
- if (!senders[i].isConnected()) senders[i].connect();
+ senders[i].connect();
}catch ( IOException io ) {
if ( x==null ) x = new ChannelException(io);
x.addFaultyMember(senders[i].getDestination(),io);
@@ -191,7 +192,7 @@
}
if ( x != null ) throw x;
}
-
+
private void setData(NioSender[] senders, byte[] data) throws
ChannelException {
ChannelException x = null;
for (int i=0; i<senders.length; i++ ) {
@@ -204,8 +205,8 @@
}
if ( x != null ) throw x;
}
-
-
+
+
private NioSender[] setupForSend(Member[] destination) throws
ChannelException {
ChannelException cx = null;
NioSender[] result = new NioSender[destination.length];
@@ -222,6 +223,7 @@
sender.reset();
sender.setDestination(destination[i]);
sender.setSelector(selector);
+ sender.setUdpBased(isUdpBased());
result[i] = sender;
}
}catch ( UnknownHostException x ) {
@@ -232,13 +234,13 @@
if ( cx != null ) throw cx;
else return result;
}
-
+
public void connect() {
//do nothing, we connect on demand
setConnected(true);
}
-
-
+
+
private synchronized void close() throws ChannelException {
ChannelException x = null;
Object[] members = nioSenders.keySet().toArray();
@@ -255,24 +257,24 @@
}
if ( x != null ) throw x;
}
-
+
public void add(Member member) {
-
+
}
-
+
public void remove(Member member) {
//disconnect senders
NioSender sender = (NioSender)nioSenders.remove(member);
if ( sender != null ) sender.disconnect();
}
-
+
public synchronized void disconnect() {
setConnected(false);
try {close(); }catch (Exception x){}
-
+
}
-
+
public void finalize() {
try {disconnect(); }catch ( Exception ignore){}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]