Author: fhanik Date: Tue May 2 13:30:17 2006 New Revision: 399039 URL: http://svn.apache.org/viewcvs?rev=399039&view=rev Log: Refactored the sender so that its easy to transfer properties, and not missing important settings Moved shutdown payload to the Member interface so that an app can differentiate between shutdown and crash
Modified: tomcat/container/tc5.5.x/modules/groupcom/doc/introduction.xml tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/AbstractSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/MultipointBioSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/PooledMultiSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/TestNioSender.java Modified: tomcat/container/tc5.5.x/modules/groupcom/doc/introduction.xml URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/doc/introduction.xml?rev=399039&r1=399038&r2=399039&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/doc/introduction.xml (original) +++ tomcat/container/tc5.5.x/modules/groupcom/doc/introduction.xml Tue May 2 13:30:17 2006 @@ -162,7 +162,8 @@ to the sender. This is a unique feature that adds an incredible amount value to the application developer. Most frameworks here will tell you that the message was delivered, and the application developer has to build in logic on whether the message was actually processed properly by the application - on the remote node. + on the remote node. If configured, Tribes will throw an exception when it receives an ACK_FAIL + and associate that exception with the member that didn't process the message. </li> </ol> You can of course write even more sophisticated guarantee levels, and some of them will be mentioned later on Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java?rev=399039&r1=399038&r2=399039&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java Tue May 2 13:30:17 2006 @@ -31,6 +31,13 @@ public interface Member { + + /** + * When a member leaves the cluster, the payload of the memberDisappeared member + * will be the following bytes. + */ + public static final byte[] SHUTDOWN_PAYLOAD = new byte[] {66, 65, 66, 89, 45, 65, 76, 69, 88}; + /** * Return implementation specific properties about this cluster node. */ Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=399039&r1=399038&r2=399039&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java Tue May 2 13:30:17 2006 @@ -25,6 +25,7 @@ import org.apache.catalina.tribes.MembershipListener; import java.util.Arrays; import java.net.SocketTimeoutException; +import org.apache.catalina.tribes.Member; /** * A <b>membership</b> implementation using simple multicast. @@ -112,8 +113,6 @@ */ protected InetAddress mcastBindAddress = null; - protected static final byte[] STOP_PAYLOAD = new byte[] {66, 65, 66, 89, 45, 65, 76, 69, 88}; - /** * Create a new mcast service impl * @param member - the local member @@ -213,7 +212,7 @@ receiver = null; //send a stop message byte[] payload = member.getPayload(); - member.setPayload(STOP_PAYLOAD); + member.setPayload(Member.SHUTDOWN_PAYLOAD); member.getData(true,true); send(); //restore payload @@ -238,7 +237,7 @@ if (log.isDebugEnabled()) log.debug("Mcast receive ping from member " + m); - if (Arrays.equals(m.getPayload(), STOP_PAYLOAD)) { + if (Arrays.equals(m.getPayload(), Member.SHUTDOWN_PAYLOAD)) { if (log.isDebugEnabled()) log.debug("Member has shutdown:" + m); membership.removeMcastMember(m); service.memberDisappeared(m); Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/AbstractSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/AbstractSender.java?rev=399039&r1=399038&r2=399039&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/AbstractSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/AbstractSender.java Tue May 2 13:30:17 2006 @@ -48,7 +48,7 @@ private Member destination; private InetAddress address; private int port; - private int maxRetryAttempts = 2;//zero resends + private int maxRetryAttempts = 1;//1 resends private int attempt; private boolean tcpNoDelay = true; private boolean soKeepAlive = false; @@ -58,22 +58,38 @@ private int soLingerTime = 3; private int soTrafficClass = 0x04 | 0x08 | 0x010; private boolean throwOnFailedAck = false; + + /** + * transfers sender properties from one sender to another + * @param from AbstractSender + * @param to AbstractSender + */ + public static void transferProperties(AbstractSender from, AbstractSender to) { + to.rxBufSize = from.rxBufSize; + to.txBufSize = from.txBufSize; + to.directBuffer = from.directBuffer; + to.keepAliveCount = from.keepAliveCount; + to.keepAliveTime = from.keepAliveTime; + to.timeout = from.timeout; + to.destination = from.destination; + to.address = from.address; + to.port = from.port; + to.maxRetryAttempts = from.maxRetryAttempts; + to.tcpNoDelay = from.tcpNoDelay; + to.soKeepAlive = from.soKeepAlive; + to.ooBInline = from.ooBInline; + to.soReuseAddress = from.soReuseAddress; + to.soLingerOn = from.soLingerOn; + to.soLingerTime = from.soLingerTime; + to.soTrafficClass = from.soTrafficClass; + to.throwOnFailedAck = from.throwOnFailedAck; + } + + public AbstractSender() { } - public AbstractSender(Member destination) throws UnknownHostException { - this.destination = destination; - this.address = InetAddress.getByAddress(destination.getHost()); - this.port = destination.getPort(); - } - - public AbstractSender(Member destination, int rxBufSize, int txBufSize) throws UnknownHostException { - this(destination); - this.rxBufSize = rxBufSize; - this.txBufSize = txBufSize; - } - /** * connect * @@ -268,4 +284,20 @@ public void setThrowOnFailedAck(boolean throwOnFailedAck) { this.throwOnFailedAck = throwOnFailedAck; } + + public void setDestination(Member destination) throws UnknownHostException { + this.destination = destination; + this.address = InetAddress.getByAddress(destination.getHost()); + this.port = destination.getPort(); + + } + + public void setPort(int port) { + this.port = port; + } + + public void setAddress(InetAddress address) { + this.address = address; + } + } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java?rev=399039&r1=399038&r2=399039&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java Tue May 2 13:30:17 2006 @@ -71,15 +71,9 @@ // ------------------------------------------------------------- Constructor - public BioSender(Member member) throws UnknownHostException { - super(member); - if (log.isDebugEnabled()) - log.debug(sm.getString("IDataSender.create",getAddress(), new Integer(getPort()))); + public BioSender() { } - public BioSender(Member member, int rxBufSize, int txBufSize) throws UnknownHostException { - super(member,rxBufSize,txBufSize); - } // ------------------------------------------------------------- Properties Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/MultipointBioSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/MultipointBioSender.java?rev=399039&r1=399038&r2=399039&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/MultipointBioSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/MultipointBioSender.java Tue May 2 13:30:17 2006 @@ -57,12 +57,9 @@ try { BioSender sender = (BioSender) bioSenders.get(destination[i]); if (sender == null) { - sender = new BioSender(destination[i], getRxBufSize(), getTxBufSize()); - sender.setKeepAliveCount(getKeepAliveCount()); - sender.setKeepAliveTime(getKeepAliveTime()); - sender.setTimeout(getTimeout()); - sender.setMaxRetryAttempts(getMaxRetryAttempts()); - sender.setKeepAliveTime(getKeepAliveTime()); + sender = new BioSender(); + sender.transferProperties(this,sender); + sender.setDestination(destination[i]); bioSenders.put(destination[i], sender); } result[i] = sender; Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/PooledMultiSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/PooledMultiSender.java?rev=399039&r1=399038&r2=399039&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/PooledMultiSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/PooledMultiSender.java Tue May 2 13:30:17 2006 @@ -51,10 +51,7 @@ */ public DataSender getNewDataSender() { MultipointBioSender sender = new MultipointBioSender(); - sender.setTimeout(getTimeout()); - sender.setMaxRetryAttempts(getMaxRetryAttempts()); - sender.setRxBufSize(getRxBufSize()); - sender.setTxBufSize(getTxBufSize()); + sender.transferProperties(this,sender); return sender; } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java?rev=399039&r1=399038&r2=399039&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java Tue May 2 13:30:17 2006 @@ -66,8 +66,8 @@ protected boolean connecting = false; - public NioSender(Member destination) throws UnknownHostException { - super(destination); + public NioSender() { + super(); } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java?rev=399039&r1=399038&r2=399039&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java Tue May 2 13:30:17 2006 @@ -202,25 +202,22 @@ NioSender[] result = new NioSender[destination.length]; for ( int i=0; i<destination.length; i++ ) { NioSender sender = (NioSender)nioSenders.get(destination[i]); - if ( sender == null ) { - try { - sender = new NioSender(destination[i]); + try { + + if (sender == null) { + sender = new NioSender(); + sender.transferProperties(this, sender); nioSenders.put(destination[i], sender); - }catch ( UnknownHostException x ) { - if ( cx == null ) cx = new ChannelException("Unable to setup NioSender.",x); - cx.addFaultyMember(destination[i],x); } - } - if ( sender != null ) { - sender.reset(); - sender.setSelector(selector); - sender.setDirectBuffer(getDirectBuffer()); - sender.setRxBufSize(getRxBufSize()); - sender.setTxBufSize(getTxBufSize()); - sender.setTimeout(getTimeout()); - sender.setKeepAliveCount(getKeepAliveCount()); - sender.setKeepAliveTime(getKeepAliveTime()); - result[i] = sender; + if (sender != null) { + sender.reset(); + sender.setDestination(destination[i]); + sender.setSelector(selector); + result[i] = sender; + } + }catch ( UnknownHostException x ) { + if (cx == null) cx = new ChannelException("Unable to setup NioSender.", x); + cx.addFaultyMember(destination[i], x); } } if ( cx != null ) throw cx; Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java?rev=399039&r1=399038&r2=399039&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java Tue May 2 13:30:17 2006 @@ -55,11 +55,7 @@ public DataSender getNewDataSender() { try { ParallelNioSender sender = new ParallelNioSender(); - sender.setTimeout(getTimeout()); - sender.setMaxRetryAttempts(getMaxRetryAttempts()); - sender.setDirectBuffer(getDirectBuffer()); - sender.setRxBufSize(getRxBufSize()); - sender.setTxBufSize(getTxBufSize()); + sender.transferProperties(this,sender); return sender; } catch ( IOException x ) { throw new RuntimeException("Unable to open NIO selector.",x); Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/TestNioSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/TestNioSender.java?rev=399039&r1=399038&r2=399039&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/TestNioSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/TestNioSender.java Tue May 2 13:30:17 2006 @@ -48,7 +48,8 @@ public void init() throws Exception { selector = Selector.open(); mbr = new MemberImpl("","localhost",4444,0); - NioSender sender = new NioSender(mbr); + NioSender sender = new NioSender(); + sender.setDestination(mbr); sender.setDirectBuffer(true); sender.setSelector(selector); sender.setMessage(XByteBuffer.createDataPackage(getMessage(mbr))); --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]