Author: fhanik
Date: Mon May 22 15:39:06 2006
New Revision: 408775
URL: http://svn.apache.org/viewvc?rev=408775&view=rev
Log:
Membership arrival and disappearance should never be locked cause the
interceptor or app is holding on to the thread.
These are rare events, hence we don't need a thread pool, instead fire off the
events using a new thread.
Added:
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestMemberArrival.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
URL:
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=408775&r1=408774&r2=408775&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
Mon May 22 15:39:06 2006
@@ -197,12 +197,6 @@
receiver.setDaemon(true);
receiver.start();
valid = true;
- long memberwait = sendFrequency*4;
- if(log.isInfoEnabled())
- log.info("Sleeping for "+memberwait+" milliseconds to
establish cluster membership");
- try {Thread.sleep(memberwait);}catch (InterruptedException
ignore){}
- if(log.isInfoEnabled())
- log.info("Done sleeping, membership established.");
}
if ( (level & Channel.MBR_TX_SEQ)==Channel.MBR_TX_SEQ ) {
if ( sender != null ) throw new
IllegalStateException("McastService.send already running.");
@@ -214,14 +208,26 @@
sender = new SenderThread(sendFrequency);
sender.setDaemon(true);
sender.start();
+ //we have started the receiver, but not yet waited for membership
to establish
valid = true;
}
if (!valid) {
throw new IllegalArgumentException("Invalid start level. Only
acceptable levels are Channel.MBR_RX_SEQ and Channel.MBR_TX_SEQ");
}
+ //pause, once or twice
+ waitForMembers(level);
startLevel = (startLevel | level);
}
+ private void waitForMembers(int level) {
+ long memberwait = sendFrequency*4;
+ if(log.isInfoEnabled())
+ log.info("Sleeping for "+memberwait+" milliseconds to establish
cluster membership, start level:"+level);
+ try {Thread.sleep(memberwait);}catch (InterruptedException ignore){}
+ if(log.isInfoEnabled())
+ log.info("Done sleeping, membership established, start
level:"+level);
+ }
+
/**
* Stops the service
* @throws IOException if the service fails to disconnect from the sockets
@@ -272,19 +278,27 @@
socket.receive(receivePacket);
byte[] data = new byte[receivePacket.getLength()];
System.arraycopy(receivePacket.getData(),
receivePacket.getOffset(), data, 0, data.length);
- MemberImpl m = MemberImpl.getMember(data);
+ final MemberImpl m = MemberImpl.getMember(data);
if (log.isDebugEnabled())
log.debug("Mcast receive ping from member " + m);
-
- if (Arrays.equals(m.getPayload(), Member.SHUTDOWN_PAYLOAD)) {
- if (log.isDebugEnabled()) log.debug("Member has shutdown:" +
m);
- membership.removeMember(m);
- service.memberDisappeared(m);
- } else if (membership.memberAlive(m)) {
- if (log.isDebugEnabled())
- log.debug("Mcast add member " + m);
- service.memberAdded(m);
- } //end if
+ Thread t = null;
+ if (Arrays.equals(m.getPayload(), Member.SHUTDOWN_PAYLOAD)) {
+ if (log.isDebugEnabled()) log.debug("Member has shutdown:"
+ m);
+ membership.removeMember(m);
+ t = new Thread() {
+ public void run() {
+ service.memberDisappeared(m);
+ }
+ };
+ } else if (membership.memberAlive(m)) {
+ if (log.isDebugEnabled()) log.debug("Mcast add member " +
m);
+ t = new Thread() {
+ public void run() {
+ service.memberAdded(m);
+ }
+ };
+ } //end if
+ if ( t != null ) t.start();
} catch (SocketTimeoutException x ) {
//do nothing, this is normal, we don't want to block forever
//since the receive thread is the same thread
@@ -298,10 +312,16 @@
synchronized (expiredMutex) {
MemberImpl[] expired = membership.expire(timeToExpiration);
for (int i = 0; i < expired.length; i++) {
+ final MemberImpl member = expired[i];
if (log.isDebugEnabled())
log.debug("Mcast exipre member " + expired[i]);
try {
- service.memberDisappeared(expired[i]);
+ Thread t = new Thread() {
+ public void run() {
+ service.memberDisappeared(member);
+ }
+ };
+ t.start();
} catch (Exception x) {
log.error("Unable to process member disappeared message.",
x);
}
Added:
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestMemberArrival.java
URL:
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestMemberArrival.java?rev=408775&view=auto
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestMemberArrival.java
(added)
+++
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestMemberArrival.java
Mon May 22 15:39:06 2006
@@ -0,0 +1,100 @@
+package org.apache.catalina.tribes.test.membership;
+
+import java.util.ArrayList;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ManagedChannel;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.group.GroupChannel;
+import junit.framework.TestCase;
+
+public class TestMemberArrival
+ extends TestCase {
+ private static int count = 10;
+ private ManagedChannel[] channels = new ManagedChannel[count];
+ private TestMbrListener[] listeners = new TestMbrListener[count];
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ for (int i = 0; i < channels.length; i++) {
+ channels[i] = new GroupChannel();
+ channels[i].getMembershipService().setPayload( ("Channel-" + (i +
1)).getBytes("ASCII"));
+ listeners[i] = new TestMbrListener( ("Listener-" + (i + 1)));
+ channels[i].addMembershipListener(listeners[i]);
+
+ }
+ }
+
+ public void clear() {
+ for (int i = 0; i < channels.length; i++) {
+ listeners[i].members.clear();
+ }
+ }
+
+ public void testMemberArrival() throws Exception {
+ //purpose of this test is to make sure that we have received all the
members
+ //that we can expect before the start method returns
+ Thread[] threads = new Thread[channels.length];
+ for (int i=0; i<channels.length; i++ ) {
+ final Channel channel = channels[i];
+ Thread t = new Thread() {
+ public void run() {
+ try {
+ channel.start(Channel.DEFAULT);
+ }catch ( Exception x ) {
+ throw new RuntimeException(x);
+ }
+ }
+ };
+ threads[i] = t;
+ }
+ for (int i=0; i<threads.length; i++ ) threads[i].start();
+ for (int i=0; i<threads.length; i++ ) threads[i].join();
+ System.out.println("All channels started.");
+ for (int i=listeners.length-1; i>=0; i-- ) assertEquals("Checking
member arrival length",channels.length-1,listeners[i].members.size());
+ }
+
+ protected void tearDown() throws Exception {
+
+ for (int i = 0; i < channels.length; i++) {
+ try {
+ channels[i].stop(Channel.DEFAULT);
+ } catch (Exception ignore) {}
+ }
+ super.tearDown();
+ }
+
+ public class TestMbrListener
+ implements MembershipListener {
+ public String name = null;
+ public TestMbrListener(String name) {
+ this.name = name;
+ }
+
+ public ArrayList members = new ArrayList();
+ public void memberAdded(Member member) {
+ if (!members.contains(member)) {
+ members.add(member);
+ try {
+ System.out.println(name + ":member added[" + new
String(member.getPayload(), "ASCII") + ";
Thread:"+Thread.currentThread().getName()+"]");
+ } catch (Exception x) {
+ System.out.println(name + ":member added[unknown]");
+ }
+ }
+ }
+
+ public void memberDisappeared(Member member) {
+ if (members.contains(member)) {
+ members.remove(member);
+ try {
+ System.out.println(name + ":member disappeared[" + new
String(member.getPayload(), "ASCII") + ";
Thread:"+Thread.currentThread().getName()+"]");
+ } catch (Exception x) {
+ System.out.println(name + ":member disappeared[unknown]");
+ }
+ }
+ }
+
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]