Author: fhanik
Date: Tue Jun 13 21:05:24 2006
New Revision: 414056
URL: http://svn.apache.org/viewvc?rev=414056&view=rev
Log:
Created a coordination demo, text only, to show how election works
Added:
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java
URL:
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java?rev=414056&r1=414055&r2=414056&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java
Tue Jun 13 21:05:24 2006
@@ -171,6 +171,7 @@
interface InterceptorEvent {
int getEventType();
+ String getEventTypeDesc();
ChannelInterceptor getInterceptor();
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
URL:
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java?rev=414056&r1=414055&r2=414056&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
Tue Jun 13 21:05:24 2006
@@ -14,10 +14,12 @@
*/
package org.apache.catalina.tribes.group.interceptors;
-import java.util.LinkedHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelInterceptor;
+import org.apache.catalina.tribes.ChannelInterceptor.InterceptorEvent;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.UniqueId;
@@ -30,12 +32,6 @@
import org.apache.catalina.tribes.membership.Membership;
import org.apache.catalina.tribes.util.Arrays;
import org.apache.catalina.tribes.util.UUIDGenerator;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.catalina.tribes.membership.*;
-import org.apache.catalina.tribes.test.interceptors.TestNonBlockingCoordinator;
-import org.apache.catalina.tribes.ChannelInterceptor.InterceptorEvent;
-import org.apache.catalina.tribes.ChannelInterceptor;
/**
* <p>Title: Auto merging leader election algorithm</p>
@@ -217,9 +213,11 @@
//no message arrived, send the coord msg
fireInterceptorEvent(new
CoordinationEvent(CoordinationEvent.EVT_WAIT_FOR_MSG,this,"Election, waiting
timed out."));
startElection(true);
+ } else {
+ fireInterceptorEvent(new
CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election
abandoned"));
}
}//end if
- fireInterceptorEvent(new
CoordinationEvent(CoordinationEvent.EVT_START_ELECT,this,"Election in
progress"));
+
}
}
@@ -372,7 +370,7 @@
}
viewChange(viewId,view.getMembers());
- fireInterceptorEvent(new
CoordinationEvent(CoordinationEvent.EVT_CONF_RX,this,"Accepted View
id("+this.viewId+")"));
+ fireInterceptorEvent(new
CoordinationEvent(CoordinationEvent.EVT_CONF_RX,this,"Accepted View"));
if ( suggestedviewId == null &&
hasHigherPriority(merged.getMembers(),membership.getMembers()) ) {
startElection(false);
@@ -401,6 +399,14 @@
return (view != null && view.hasMembers()) ? view.getMembers()[0] :
null;
}
+ public Member[] getView() {
+ return (view != null && view.hasMembers()) ? view.getMembers() : new
Member[0];
+ }
+
+ public UniqueId getViewId() {
+ return viewId;
+ }
+
/**
* Block in/out messages while a election is going on
*/
@@ -723,6 +729,7 @@
static final int EVT_SEND_MSG = 10;
static final int EVT_STOP = 11;
static final int EVT_CONF_RX = 12;
+ static final int EVT_ELECT_ABANDONED = 13;
int type;
ChannelInterceptor interceptor;
@@ -743,6 +750,25 @@
public int getEventType() {
return type;
+ }
+
+ public String getEventTypeDesc() {
+ switch (type) {
+ case EVT_START: return "EVT_START:"+info;
+ case EVT_MBR_ADD: return "EVT_MBR_ADD:"+info;
+ case EVT_MBR_DEL: return "EVT_MBR_DEL:"+info;
+ case EVT_START_ELECT: return "EVT_START_ELECT:"+info;
+ case EVT_PROCESS_ELECT: return "EVT_PROCESS_ELECT:"+info;
+ case EVT_MSG_ARRIVE: return "EVT_MSG_ARRIVE:"+info;
+ case EVT_PRE_MERGE: return "EVT_PRE_MERGE:"+info;
+ case EVT_POST_MERGE: return "EVT_POST_MERGE:"+info;
+ case EVT_WAIT_FOR_MSG: return "EVT_WAIT_FOR_MSG:"+info;
+ case EVT_SEND_MSG: return "EVT_SEND_MSG:"+info;
+ case EVT_STOP: return "EVT_STOP:"+info;
+ case EVT_CONF_RX: return "EVT_CONF_RX:"+info;
+ case EVT_ELECT_ABANDONED: return "EVT_ELECT_ABANDONED:"+info;
+ default: return "Unknown";
+ }
}
public ChannelInterceptor getInterceptor() {
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java
URL:
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java?rev=414056&r1=414055&r2=414056&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java
Tue Jun 13 21:05:24 2006
@@ -46,7 +46,7 @@
}
public static String toString(byte[] data) {
- return toString(data,0,data.length);
+ return toString(data,0,data!=null?data.length:0);
}
public static String toString(byte[] data, int offset, int length) {
Added:
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java
URL:
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java?rev=414056&view=auto
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java
(added)
+++
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java
Tue Jun 13 21:05:24 2006
@@ -0,0 +1,245 @@
+package org.apache.catalina.tribes.demos;
+
+import org.apache.catalina.tribes.group.GroupChannel;
+import
org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
+import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator;
+import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.ChannelInterceptor.InterceptorEvent;
+import org.apache.catalina.tribes.ChannelInterceptor;
+import org.apache.catalina.tribes.util.Arrays;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+
+
+public class CoordinationDemo {
+ static int CHANNEL_COUNT = 5;
+ static int SCREEN_WIDTH = 120;
+ StringBuffer statusLine = new StringBuffer();
+ Status[] status = new Status[CHANNEL_COUNT];
+ BufferedReader reader = null;
+ /**
+ * Construct and show the application.
+ */
+ public CoordinationDemo() {
+ reader = new BufferedReader(new InputStreamReader(System.in));
+ }
+
+
+ public void clearScreen() {
+ StringBuffer buf = new StringBuffer(700);
+ for (int i=0; i<30; i++ ) buf.append("\n");
+ System.out.println(buf);
+ }
+
+ public void printMenuOptions() {
+ System.out.println("Commands:");
+ System.out.println("start [member id]");
+ System.out.println("stop [member id]");
+ System.out.println("quit");
+ System.out.print("Enter command:");
+ }
+
+ public synchronized void printScreen() {
+ clearScreen();
+ System.out.println("XXX. "+getHeader());
+ for ( int i=0; i<status.length; i++ ) {
+ System.out.print(fill(String.valueOf(i+1)+".",5," "));
+ if ( status[i] != null )
System.out.print(status[i].getStatusLine());
+ }
+ System.out.println("\n\n");
+ System.out.println("Overall status:"+statusLine);
+ printMenuOptions();
+
+ }
+
+ public String getHeader() {
+ //member - 30
+ //running- 8
+ //coord - 30
+ //view-id - 24
+ //view count - 8
+
+ StringBuffer buf = new StringBuffer();
+ buf.append(fill("Member",30," "));
+ buf.append(fill("Running",8," "));
+ buf.append(fill("Coord",30," "));
+ buf.append(fill("View-id(short)",24," "));
+ buf.append(fill("Count",8," "));
+ buf.append("\n");
+ buf.append(fill("",SCREEN_WIDTH,"="));
+ buf.append("\n");
+ return buf.toString();
+ }
+
+ public String[] tokenize(String line) {
+ StringTokenizer tz = new StringTokenizer(line," ");
+ String[] result = new String[tz.countTokens()];
+ for (int i=0; i<result.length; i++ ) result[i] = tz.nextToken();
+ return result;
+ }
+
+ public void waitForInput() throws IOException {
+ for ( int i=0; i<status.length; i++ ) status[i] = new Status(this);
+ printScreen();
+ String l = reader.readLine();
+ String[] args = tokenize(l);
+ while ( args.length >= 1 && (!"quit".equalsIgnoreCase(args[0]))) {
+ if ("start".equalsIgnoreCase(args[0])) {
+ if ( args.length == 1 ) {
+ setSystemStatus("System starting up...");
+ for (int i = 0; i < status.length; i++) status[i].start();
+ setSystemStatus("System started.");
+ } else {
+ int index = -1;
+ try { index = Integer.parseInt(args[1])-1;}catch (
Exception x ) {setSystemStatus("Invalid index:"+args[1]);}
+ if ( index >= 0 ) {
+ setSystemStatus("Starting member:"+(index+1));
+ status[index].start();
+ setSystemStatus("Member started:"+(index+1));
+ }
+ }
+ } else if ("stop".equalsIgnoreCase(args[0])) {
+ if ( args.length == 1 ) {
+ setSystemStatus("System shutting down...");
+ for (int i = 0; i < status.length; i++) status[i].stop();
+ setSystemStatus("System stopped.");
+ } else {
+ int index = -1;
+ try { index = Integer.parseInt(args[1])-1;}catch (
Exception x ) {setSystemStatus("Invalid index:"+args[1]);}
+ if ( index >= 0 ) {
+ setSystemStatus("Stopping member:"+(index+1));
+ status[index].stop();
+ setSystemStatus("Member stopped:"+(index+1));
+ }
+ }
+
+ }
+ printScreen();
+ l = reader.readLine();
+ args = tokenize(l);
+ }
+ for ( int i=0; i<status.length; i++ ) status[i].stop();
+ }
+
+ public void setSystemStatus(String status) {
+ statusLine.delete(0,statusLine.length());
+ statusLine.append(status);
+ }
+
+
+
+
+
+
+ public static void main(String[] args) throws Exception {
+ CoordinationDemo demo = new CoordinationDemo();
+ demo.waitForInput();
+ }
+
+ public static String fill(String value, int length, String ch) {
+ StringBuffer buf = new StringBuffer();
+ for (int i=value.trim().length(); i<length; i++ ) buf.append(ch);
+ buf.append(value.trim());
+ return buf.toString();
+ }
+
+
+ public static class Status {
+ public CoordinationDemo parent;
+ public GroupChannel channel;
+ NonBlockingCoordinator interceptor = null;
+ public String status;
+ public Exception error;
+ public boolean started = false;
+
+ public Status(CoordinationDemo parent) {
+ this.parent = parent;
+ }
+
+ public String getStatusLine() {
+ //member - 30
+ //running- 8
+ //coord - 30
+ //view-id - 24
+ //view count - 8
+ StringBuffer buf = new StringBuffer();
+ String local = "";
+ String coord = "";
+ String viewId = "";
+ String count = "0";
+ if ( channel != null ) {
+ Member lm = channel.getLocalMember(false);
+ local = lm!=null?lm.getName():"";
+ coord =
interceptor.getCoordinator()!=null?interceptor.getCoordinator().getName():"";
+ viewId =
getByteString(interceptor.getViewId()!=null?interceptor.getViewId().getBytes():new
byte[0]);
+ count = String.valueOf(interceptor.getView().length);
+ }
+ buf.append(fill(local,30," "));
+ buf.append(fill(String.valueOf(started), 8, " "));
+ buf.append(fill(coord, 30, " "));
+ buf.append(fill(viewId, 24, " "));
+ buf.append(fill(count, 8, " "));
+ buf.append("\n");
+ buf.append("Status:"+status);
+ buf.append("\n");
+ return buf.toString();
+ }
+
+ public String getByteString(byte[] b) {
+ if ( b == null ) return "{}";
+ return Arrays.toString(b,0,Math.min(b.length,4));
+ }
+
+ public void start() {
+ try {
+ if ( channel == null ) {
+ channel = createChannel();
+ channel.start(channel.DEFAULT);
+ started = true;
+ } else {
+ status = "Channel already started.";
+ }
+ } catch ( Exception x ) {
+ status = "Start failed:"+x.getMessage();
+ error = x;
+ started = false;
+ }
+ }
+
+ public void stop() {
+ try {
+ if ( channel != null ) {
+ channel.stop(channel.DEFAULT);
+ status = "Channel Stopped";
+ }
+ }catch ( Exception x ) {
+ status = "Stop failed:"+x.getMessage();
+ error = x;
+ }finally {
+ started = false;
+ channel = null;
+ interceptor = null;
+ }
+ }
+
+ public GroupChannel createChannel() {
+ channel = new GroupChannel();
+ interceptor = new NonBlockingCoordinator() {
+ public void fireInterceptorEvent(InterceptorEvent event) {
+ status = event.getEventTypeDesc();
+ parent.printScreen();
+ try { Thread.sleep(100); }catch ( Exception x){}
+
+ }
+ };
+ channel.addInterceptor(interceptor);
+ channel.addInterceptor(new TcpFailureDetector());
+ channel.addInterceptor(new MessageDispatch15Interceptor());
+ return channel;
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]