Author: fhanik Date: Wed May 3 11:48:01 2006 New Revision: 399382 URL: http://svn.apache.org/viewcvs?rev=399382&view=rev Log: Start levels are now respected correctly, this will make unit testing easier since we can shutdown and start different components to simulate errors
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java?rev=399382&r1=399381&r2=399382&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java Wed May 3 11:48:01 2006 @@ -30,9 +30,9 @@ * Start and stop sequences can be controlled by these constants */ public static final int DEFAULT = 15; - public static final int MBR_RX_SEQ = 1; + public static final int SND_RX_SEQ = 1; public static final int SND_TX_SEQ = 2; - public static final int SND_RX_SEQ = 4; + public static final int MBR_RX_SEQ = 4; public static final int MBR_TX_SEQ = 8; /** Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java?rev=399382&r1=399381&r2=399382&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java Wed May 3 11:48:01 2006 @@ -27,8 +27,8 @@ public interface MembershipService { - public static final int MBR_RX = 1; - public static final int MBR_TX = 2; + public static final int MBR_RX = Channel.MBR_RX_SEQ; + public static final int MBR_TX = Channel.MBR_TX_SEQ; /** * Sets the properties for the membership service. This must be called before @@ -52,17 +52,24 @@ /** * Starts the membership service. If a membership listeners is added * the listener will start to receive membership events. - * @param level - level 1 starts listening for members, level 2 + * @param level - level MBR_RX starts listening for members, level MBR_TX * starts broad casting the server * @throws java.lang.Exception if the service fails to start. + * @throws java.lang.IllegalArgumentException if the level is incorrect. */ public void start(int level) throws java.lang.Exception; /** - * Stops the membership service + * Starts the membership service. If a membership listeners is added + * the listener will start to receive membership events. + * @param level - level MBR_RX stops listening for members, level MBR_TX + * stops broad casting the server + * @throws java.lang.Exception if the service fails to stop + * @throws java.lang.IllegalArgumentException if the level is incorrect. */ - public void stop(); + + public void stop(int level); /** * Returns that cluster has members. Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java?rev=399382&r1=399381&r2=399382&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java Wed May 3 11:48:01 2006 @@ -41,10 +41,13 @@ private ChannelReceiver clusterReceiver = new NioReceiver(); private ChannelSender clusterSender = new ReplicationTransmitter(); private MembershipService membershipService = new McastService(); - private boolean started = false; + //override optionflag protected int optionFlag = Channel.SEND_OPTIONS_BYTE_MESSAGE|Channel.SEND_OPTIONS_USE_ACK|Channel.SEND_OPTIONS_SYNCHRONIZED_ACK; + public int getOptionFlag() {return optionFlag;} + public void setOptionFlag(int flag) {optionFlag=flag;} + private int startLevel = 0; public ChannelCoordinator() { @@ -70,7 +73,7 @@ if ( destination == null ) destination = membershipService.getMembers(); clusterSender.sendMessage(msg,destination); } - + /** * Starts up the channel. This can be called multiple times for individual services to start @@ -84,7 +87,7 @@ * @throws ChannelException if a startup error occurs or the service is already started. */ public void start(int svc) throws ChannelException { - this.start(); + this.internalStart(svc); } /** @@ -99,7 +102,7 @@ * @throws ChannelException if a startup error occurs or the service is already started. */ public void stop(int svc) throws ChannelException { - this.stop(); + this.internalStop(svc); } @@ -114,20 +117,38 @@ * SND_RX_SEQ - starts the replication receiver<BR> * @throws ChannelException if a startup error occurs or the service is already started. */ - protected synchronized void start() throws ChannelException { + protected synchronized void internalStart(int svc) throws ChannelException { try { - if (started) return; + boolean valid = false; + if (startLevel == Channel.DEFAULT) return; + //must start the receiver first so that we can coordinate the port it //listens to with the local membership settings - clusterReceiver.setMessageListener(this); - clusterReceiver.start(); - clusterSender.start(); - //synchronize, big time FIXME - membershipService.setMembershipListener(this); - membershipService.setLocalMemberProperties(getClusterReceiver().getHost(), getClusterReceiver().getPort()); - membershipService.start(MembershipService.MBR_RX); - membershipService.start(MembershipService.MBR_TX); - this.started = true; + if ( Channel.SND_RX_SEQ==(svc & Channel.SND_RX_SEQ) ) { + clusterReceiver.setMessageListener(this); + clusterReceiver.start(); + //synchronize, big time FIXME + membershipService.setLocalMemberProperties(getClusterReceiver().getHost(), getClusterReceiver().getPort()); + valid = true; + } + if ( Channel.SND_TX_SEQ==(svc & Channel.SND_TX_SEQ) ) { + clusterSender.start(); + valid = true; + } + + if ( Channel.MBR_RX_SEQ==(svc & Channel.MBR_RX_SEQ) ) { + membershipService.setMembershipListener(this); + membershipService.start(MembershipService.MBR_RX); + valid = true; + } + if ( Channel.MBR_TX_SEQ==(svc & Channel.MBR_TX_SEQ) ) { + membershipService.start(MembershipService.MBR_TX); + valid = true; + } + if ( !valid) { + throw new IllegalArgumentException("Invalid start level, valid levels are:SND_RX_SEQ,SND_TX_SEQ,MBR_TX_SEQ,MBR_RX_SEQ"); + } + startLevel = (startLevel | svc); }catch ( ChannelException cx ) { throw cx; }catch ( Exception x ) { @@ -146,17 +167,40 @@ * SND_RX_SEQ - starts the replication receiver<BR> * @throws ChannelException if a startup error occurs or the service is already started. */ - protected synchronized void stop() throws ChannelException { + protected synchronized void internalStop(int svc) throws ChannelException { try { - if ( !started ) return; - membershipService.stop(); - clusterReceiver.stop(); - clusterSender.stop(); - membershipService.stop(); + boolean valid = false; + if ( startLevel == 0 ) return; + if ( Channel.SND_RX_SEQ==(svc & Channel.SND_RX_SEQ) ) { + clusterReceiver.stop(); + clusterReceiver.setMessageListener(null); + valid = true; + } + if ( Channel.SND_TX_SEQ==(svc & Channel.SND_TX_SEQ) ) { + clusterSender.stop(); + valid = true; + } + + if ( Channel.MBR_RX_SEQ==(svc & Channel.MBR_RX_SEQ) ) { + membershipService.stop(MembershipService.MBR_RX); + membershipService.setMembershipListener(null); + valid = true; + + } + if ( Channel.MBR_TX_SEQ==(svc & Channel.MBR_TX_SEQ) ) { + valid = true; + membershipService.stop(MembershipService.MBR_TX); + } + if ( !valid) { + throw new IllegalArgumentException("Invalid start level, valid levels are:SND_RX_SEQ,SND_TX_SEQ,MBR_TX_SEQ,MBR_RX_SEQ"); + } + + startLevel = (startLevel & (~svc)); + }catch ( Exception x ) { throw new ChannelException(x); } finally { - started = false; + } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java?rev=399382&r1=399381&r2=399382&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java Wed May 3 11:48:01 2006 @@ -205,7 +205,7 @@ * @param name The property to check for */ protected void hasProperty(Properties properties, String name){ - if ( properties.getProperty(name)==null) throw new IllegalArgumentException("Required property \""+name+"\" is missing."); + if ( properties.getProperty(name)==null) throw new IllegalArgumentException("McastService:Required property \""+name+"\" is missing."); } /** @@ -218,6 +218,14 @@ } public void start(int level) throws java.lang.Exception { + hasProperty(properties,"mcastPort"); + hasProperty(properties,"mcastAddress"); + hasProperty(properties,"mcastClusterDomain"); + hasProperty(properties,"memberDropTime"); + hasProperty(properties,"mcastFrequency"); + hasProperty(properties,"tcpListenPort"); + hasProperty(properties,"tcpListenHost"); + if ( impl != null ) { impl.start(level); return; @@ -279,13 +287,12 @@ /** * Stop broadcasting and listening to membership pings */ - public void stop() { + public void stop(int svc) { try { - if ( impl != null) impl.stop(); + if ( impl != null && impl.stop(svc) ) impl = null; } catch ( Exception x) { - log.error("Unable to stop the mcast service.",x); + log.error("Unable to stop the mcast service, level:"+svc+".",x); } - impl = null; } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=399382&r1=399381&r2=399382&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java Wed May 3 11:48:01 2006 @@ -26,6 +26,7 @@ import java.util.Arrays; import java.net.SocketTimeoutException; import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.Channel; /** * A <b>membership</b> implementation using simple multicast. @@ -45,7 +46,9 @@ /** * Internal flag used for the listen thread that listens to the multicasting socket. */ - protected boolean doRun = false; + protected boolean doRunSender = false; + protected boolean doRunReceiver = false; + protected int startLevel = 0; /** * Socket that we intend to listen to */ @@ -183,44 +186,70 @@ * @throws IllegalStateException if the service is already started */ public synchronized void start(int level) throws IOException { - if ( sender != null && receiver != null ) throw new IllegalStateException("Service already running."); - if ( level == 1 ) { + boolean valid = false; + if ( (level & Channel.MBR_RX_SEQ)==Channel.MBR_RX_SEQ ) { + if ( receiver != null ) throw new IllegalStateException("McastService.receive already running."); socket.joinGroup(address); - doRun = true; + doRunReceiver = true; receiver = new ReceiverThread(); receiver.setDaemon(true); receiver.start(); - } - if ( level==2 ) { + valid = true; + } + if ( (level & Channel.MBR_TX_SEQ)==Channel.MBR_TX_SEQ ) { + if ( sender != null ) throw new IllegalStateException("McastService.send already running."); + doRunSender = true; serviceStartTime = System.currentTimeMillis(); sender = new SenderThread(sendFrequency); sender.setDaemon(true); sender.start(); - + valid = true; + } + if (!valid) { + throw new IllegalArgumentException("Invalid start level. Only acceptable levels are Channel.MBR_RX_SEQ and Channel.MBR_TX_SEQ"); } + startLevel = (startLevel | level); } /** * Stops the service * @throws IOException if the service fails to disconnect from the sockets */ - public synchronized void stop() throws IOException { - doRun = false; - sender.interrupt(); - receiver.interrupt(); - sender = null; - receiver = null; - //send a stop message - byte[] payload = member.getPayload(); - member.setPayload(Member.SHUTDOWN_PAYLOAD); - member.getData(true,true); - send(); - //restore payload - member.setPayload(payload); - member.getData(true,true); - //leave mcast group - socket.leaveGroup(address); - serviceStartTime = Long.MAX_VALUE; + public synchronized boolean stop(int level) throws IOException { + boolean valid = false; + + if ( (level & Channel.MBR_RX_SEQ)==Channel.MBR_RX_SEQ ) { + valid = true; + doRunReceiver = false; + receiver.interrupt(); + receiver = null; + } + if ( (level & Channel.MBR_TX_SEQ)==Channel.MBR_TX_SEQ ) { + valid = true; + doRunSender = false; + sender.interrupt(); + sender = null; + } + + if (!valid) { + throw new IllegalArgumentException("Invalid stop level. Only acceptable levels are Channel.MBR_RX_SEQ and Channel.MBR_TX_SEQ"); + } + startLevel = (startLevel & (~level)); + //we're shutting down, send a shutdown message and close the socket + if ( startLevel == 0 ) { + //send a stop message + byte[] payload = member.getPayload(); + member.setPayload(Member.SHUTDOWN_PAYLOAD); + member.getData(true, true); + send(); + //restore payload + member.setPayload(payload); + member.getData(true, true); + //leave mcast group + socket.leaveGroup(address); + serviceStartTime = Long.MAX_VALUE; + } + return (startLevel == 0); } /** @@ -293,7 +322,7 @@ setName("Cluster-MembershipReceiver"); } public void run() { - while ( doRun ) { + while ( doRunReceiver ) { try { receive(); } catch ( Exception x ) { @@ -313,7 +342,7 @@ } public void run() { - while ( doRun ) { + while ( doRunSender ) { try { send(); } catch ( Exception x ) { --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]