Author: fhanik
Date: Tue Jun 13 23:13:24 2006
New Revision: 414100
URL: http://svn.apache.org/viewvc?rev=414100&view=rev
Log:
More modifications, seems to be a synchronous block that runs through the
election campaign
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReplicationTransmitter.java
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
URL:
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java?rev=414100&r1=414099&r2=414100&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
Tue Jun 13 23:13:24 2006
@@ -143,7 +143,7 @@
/**
* Time to wait for coordination timeout
*/
- protected long waitForCoordMsgTimeout = 5000;
+ protected long waitForCoordMsgTimeout = 15000;
/**
* Our current view
*/
@@ -191,7 +191,14 @@
this.handleViewConf(this.createElectionMsg(local,others,local),local,view);
return; //the only member, no need for an election
}
- if ( suggestedviewId != null ) return;//election already running,
I'm not allowed to have two of them
+ if ( suggestedviewId != null ) {
+ fireInterceptorEvent(new
CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election
abandoned, election running"));
+ return; //election already running, I'm not allowed to have
two of them
+ }
+ if ( view != null && Arrays.diff(view,membership,local).length ==
0 && Arrays.diff(membership,view,local).length == 0) {
+ fireInterceptorEvent(new
CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election
abandoned, view matches membership"));
+ return; //already have this view installed
+ }
int prio = AbsoluteOrder.comp.compare(local,others[0]);
MemberImpl leader = ( prio < 0 )?local:others[0];//am I the leader
in my view?
if ( local.equals(leader) || force ) {
@@ -300,13 +307,13 @@
}
protected void processCoordMessage(CoordinationMessage msg, Member sender)
throws ChannelException {
- synchronized (electionMutex) {
+// synchronized (electionMutex) {
coordMsgReceived.set(true);
msg.timestamp = System.currentTimeMillis();
Membership merged = mergeOnArrive(msg,sender);
if ( isViewConf(msg) ) handleViewConf(msg, sender,merged);
else handleToken(msg, sender, merged);
- }
+// }
}
protected void handleToken(CoordinationMessage msg, Member
sender,Membership merged) throws ChannelException {
@@ -433,24 +440,33 @@
// OVERRIDDEN METHODS FROM CHANNEL INTERCEPTOR BASE
//============================================================================================================
public void start(int svc) throws ChannelException {
- if ( membership == null ) setupMembership();
- if (started) return;
- fireInterceptorEvent(new
CoordinationEvent(CoordinationEvent.EVT_START,this,"Before start"));
- super.start(startsvc);
- started = true;
- view = new
Membership((MemberImpl)super.getLocalMember(true),AbsoluteOrder.comp,true);
- fireInterceptorEvent(new
CoordinationEvent(CoordinationEvent.EVT_START,this,"After start"));
- startElection(false);
+// synchronized (electionMutex) {
+ if (membership == null) setupMembership();
+ if (started)return;
+ fireInterceptorEvent(new
CoordinationEvent(CoordinationEvent.EVT_START, this, "Before start"));
+ super.start(startsvc);
+ started = true;
+ if (view == null) view = new Membership(
(MemberImpl)super.getLocalMember(true), AbsoluteOrder.comp, true);
+ fireInterceptorEvent(new
CoordinationEvent(CoordinationEvent.EVT_START, this, "After start"));
+ startElection(false);
+// }
}
public void stop(int svc) throws ChannelException {
try {
halt();
- if ( !started ) return;
- started = false;
- fireInterceptorEvent(new
CoordinationEvent(CoordinationEvent.EVT_STOP,this,"Before stop"));
- super.stop(startsvc);
- fireInterceptorEvent(new
CoordinationEvent(CoordinationEvent.EVT_STOP,this,"After stop"));
+ synchronized (electionMutex) {
+ if (!started)return;
+ started = false;
+ fireInterceptorEvent(new
CoordinationEvent(CoordinationEvent.EVT_STOP, this, "Before stop"));
+ super.stop(startsvc);
+ this.view = null;
+ this.viewId = null;
+ this.suggestedView = null;
+ this.suggestedviewId = null;
+ this.membership.reset();
+ fireInterceptorEvent(new
CoordinationEvent(CoordinationEvent.EVT_STOP, this, "After stop"));
+ }
}finally {
release();
}
@@ -486,6 +502,7 @@
}
public void memberAdded(Member member) {
+ System.out.println("MBR ADD THREAD:"+Thread.currentThread().getName());
memberAdded(member,true);
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
URL:
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java?rev=414100&r1=414099&r2=414100&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
Tue Jun 13 23:13:24 2006
@@ -170,7 +170,7 @@
if (membership == null) setupMembership();
//update all alive times
Member[] members = super.getMembers();
- for (int i = 0; i < members.length; i++) {
+ for (int i = 0; members != null && i < members.length; i++) {
if (membership.memberAlive( (MemberImpl) members[i])) {
//we don't have this one in our membership, check to see
if he/she is alive
if (memberAlive(members[i])) {
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReplicationTransmitter.java
URL:
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReplicationTransmitter.java?rev=414100&r1=414099&r2=414100&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReplicationTransmitter.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReplicationTransmitter.java
Tue Jun 13 23:13:24 2006
@@ -67,7 +67,7 @@
public void setTransport(MultiPointSender transport) {
this.transport = transport;
}
-
+
// ------------------------------------------------------------- public
/**
Modified:
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java
URL:
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java?rev=414100&r1=414099&r2=414100&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java
Tue Jun 13 23:13:24 2006
@@ -1,23 +1,26 @@
package org.apache.catalina.tribes.demos;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.StringTokenizer;
+
+import org.apache.catalina.tribes.ChannelInterceptor;
+import org.apache.catalina.tribes.ChannelInterceptor.InterceptorEvent;
+import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.group.GroupChannel;
import
org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator;
import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.ChannelInterceptor.InterceptorEvent;
-import org.apache.catalina.tribes.ChannelInterceptor;
+import org.apache.catalina.tribes.transport.ReceiverBase;
import org.apache.catalina.tribes.util.Arrays;
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.io.IOException;
-import java.util.StringTokenizer;
public class CoordinationDemo {
static int CHANNEL_COUNT = 5;
static int SCREEN_WIDTH = 120;
+ static long SLEEP_TIME = 10;
static boolean MULTI_THREAD = false;
StringBuffer statusLine = new StringBuffer();
Status[] status = null;
@@ -48,9 +51,9 @@
public synchronized void printScreen() {
clearScreen();
- System.out.println("XXX. "+getHeader());
+ System.out.println(" ###."+getHeader());
for ( int i=0; i<status.length; i++ ) {
- System.out.print(fill(String.valueOf(i+1)+".",5," "));
+ System.out.print(leftfill(String.valueOf(i+1)+".",5," "));
if ( status[i] != null )
System.out.print(status[i].getStatusLine());
}
System.out.println("\n\n");
@@ -61,19 +64,20 @@
public String getHeader() {
//member - 30
- //running- 8
+ //running- 10
//coord - 30
//view-id - 24
//view count - 8
StringBuffer buf = new StringBuffer();
- buf.append(fill("Member",30," "));
- buf.append(fill("Running",8," "));
- buf.append(fill("Coord",30," "));
- buf.append(fill("View-id(short)",24," "));
- buf.append(fill("Count",8," "));
+ buf.append(leftfill("Member",30," "));
+ buf.append(leftfill("Running",10," "));
+ buf.append(leftfill("Coord",30," "));
+ buf.append(leftfill("View-id(short)",24," "));
+ buf.append(leftfill("Count",8," "));
buf.append("\n");
- buf.append(fill("",SCREEN_WIDTH,"="));
+
+ buf.append(rightfill("==="+new
java.sql.Timestamp(System.currentTimeMillis()).toString(),SCREEN_WIDTH,"="));
buf.append("\n");
return buf.toString();
}
@@ -179,10 +183,19 @@
demo.waitForInput();
}
- public static String fill(String value, int length, String ch) {
+ public static String leftfill(String value, int length, String ch) {
+ return fill(value,length,ch,true);
+ }
+
+ public static String rightfill(String value, int length, String ch) {
+ return fill(value,length,ch,false);
+ }
+
+ public static String fill(String value, int length, String ch, boolean
left) {
StringBuffer buf = new StringBuffer();
+ if ( !left ) buf.append(value.trim());
for (int i=value.trim().length(); i<length; i++ ) buf.append(ch);
- buf.append(value.trim());
+ if ( left ) buf.append(value.trim());
return buf.toString();
}
@@ -193,7 +206,7 @@
NonBlockingCoordinator interceptor = null;
public String status;
public Exception error;
- public boolean started = false;
+ public String startstatus = "new";
public Status(CoordinationDemo parent) {
this.parent = parent;
@@ -201,7 +214,7 @@
public String getStatusLine() {
//member - 30
- //running- 8
+ //running- 10
//coord - 30
//view-id - 24
//view count - 8
@@ -217,11 +230,11 @@
viewId =
getByteString(interceptor.getViewId()!=null?interceptor.getViewId().getBytes():new
byte[0]);
count = String.valueOf(interceptor.getView().length);
}
- buf.append(fill(local,30," "));
- buf.append(fill(String.valueOf(started), 8, " "));
- buf.append(fill(coord, 30, " "));
- buf.append(fill(viewId, 24, " "));
- buf.append(fill(count, 8, " "));
+ buf.append(leftfill(local,30," "));
+ buf.append(leftfill(startstatus, 10, " "));
+ buf.append(leftfill(coord, 30, " "));
+ buf.append(leftfill(viewId, 24, " "));
+ buf.append(leftfill(count, 8, " "));
buf.append("\n");
buf.append("Status:"+status);
buf.append("\n");
@@ -237,15 +250,21 @@
try {
if ( channel == null ) {
channel = createChannel();
+ startstatus = "starting";
channel.start(channel.DEFAULT);
- started = true;
+ startstatus = "running";
} else {
status = "Channel already started.";
}
} catch ( Exception x ) {
+ synchronized (System.err) {
+ System.err.println("Start failed:");
+ StackTraceElement[] els = x.getStackTrace();
+ for (int i = 0; i < els.length; i++)
System.err.println(els[i].toString());
+ }
status = "Start failed:"+x.getMessage();
error = x;
- started = false;
+ startstatus = "failed";
}
}
@@ -258,10 +277,16 @@
status = "Channel Already Stopped";
}
}catch ( Exception x ) {
+ synchronized (System.err) {
+ System.err.println("Stop failed:");
+ StackTraceElement[] els = x.getStackTrace();
+ for (int i = 0; i < els.length; i++)
System.err.println(els[i].toString());
+ }
+
status = "Stop failed:"+x.getMessage();
error = x;
}finally {
- started = false;
+ startstatus = "stopped";
channel = null;
interceptor = null;
}
@@ -269,11 +294,12 @@
public GroupChannel createChannel() {
channel = new GroupChannel();
+ ((ReceiverBase)channel.getChannelReceiver()).setAutoBind(100);
interceptor = new NonBlockingCoordinator() {
public void fireInterceptorEvent(InterceptorEvent event) {
status = event.getEventTypeDesc();
parent.printScreen();
- try { Thread.sleep(100); }catch ( Exception x){}
+ try { Thread.sleep(SLEEP_TIME); }catch ( Exception x){}
}
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]