Author: fhanik
Date: Sun Mar 12 07:39:15 2006
New Revision: 385300
URL: http://svn.apache.org/viewcvs?rev=385300&view=rev
Log:
The replicated map implements membership logic on a per map basis. so that you
can have multiple maps in a cluster.
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=385300&r1=385299&r2=385300&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
Sun Mar 12 07:39:15 2006
@@ -89,6 +89,7 @@
private transient byte[] mapContextName;
private transient boolean stateTransferred = false;
private transient Object stateMutex = new Object();
+ private transient ArrayList mapMembers = new ArrayList();
//------------------------------------------------------------------------------
@@ -122,6 +123,19 @@
this.rpcChannel = new RpcChannel(this.mapContextName, channel, this);
this.channel.addChannelListener(this);
this.channel.addMembershipListener(this);
+
+ try {
+ MapMessage msg = new
MapMessage(this.mapContextName,MapMessage.MSG_START,
+
false,null,null,null,channel.getLocalMember());
+ Response[] resp =
rpcChannel.send(channel.getMembers(),msg,rpcChannel.FIRST_REPLY,timeout);
+ for ( int i=0; i<resp.length; i++ ) {
+ messageReceived(resp[i].getMessage(),resp[i].getSource());
+ }
+ }catch ( ChannelException x ) {
+ log.warn("Unable to send stop message.");
+ }
+
+
transferState();
}
@@ -130,6 +144,14 @@
}
public void finalize() {
+ try {
+ MapMessage msg = new
MapMessage(this.mapContextName,MapMessage.MSG_STOP,
+
false,null,null,null,channel.getLocalMember());
+ if ( channel!=null) channel.send(channel.getMembers(),msg);
+ }catch ( ChannelException x ) {
+ log.warn("Unable to send stop message.",x);
+ }
+
if ( this.rpcChannel!=null ) {
this.rpcChannel.breakDown();
}
@@ -139,6 +161,7 @@
}
this.rpcChannel = null;
this.channel = null;
+ this.mapMembers.clear();
super.clear();
this.stateTransferred = false;
}
@@ -205,7 +228,7 @@
//------------------------------------------------------------------------------
public void transferState() {
try {
- Member backup =
channel.getMembers().length>0?channel.getMembers()[0]:null;
+ Member backup = mapMembers.size()>0?(Member)mapMembers.get(0):null;
if ( backup != null ) {
MapMessage msg = new
MapMessage(mapContextName,MapMessage.MSG_STATE,false,
null,null,null,null);
@@ -244,6 +267,13 @@
if ( !(msg instanceof MapMessage) ) return null;
MapMessage mapmsg = (MapMessage)msg;
+ //map start request
+ if ( mapmsg.getMsgType() == mapmsg.MSG_START ) {
+ mapMemberAdded(sender);
+ mapmsg.setBackUpNode(channel.getLocalMember());
+ return mapmsg;
+ }
+
//backup request
if ( mapmsg.getMsgType() == mapmsg.MSG_RETRIEVE_BACKUP ) {
MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
@@ -281,7 +311,13 @@
* @param sender Member
*/
public void leftOver(Serializable msg, Member sender) {
- //ignore left over responses
+ //left over membership messages
+ if ( !(msg instanceof MapMessage) ) return;
+
+ MapMessage mapmsg = (MapMessage)msg;
+ if ( mapmsg.getMsgType() == MapMessage.MSG_START ) {
+ mapMemberAdded(mapmsg.getBackupNode());
+ }
}
public void messageReceived(Serializable msg, Member sender) {
@@ -290,6 +326,15 @@
if ( !(msg instanceof MapMessage) ) return;
MapMessage mapmsg = (MapMessage)msg;
+
+ if ( mapmsg.getMsgType() == MapMessage.MSG_START ) {
+ mapMemberAdded(mapmsg.getBackupNode());
+ }
+
+ if ( mapmsg.getMsgType() == MapMessage.MSG_STOP ) {
+ memberDisappeared(mapmsg.getBackupNode());
+ }
+
if ( mapmsg.getMsgType() == MapMessage.MSG_PROXY ) {
MapEntry entry = new MapEntry(mapmsg.getKey(),mapmsg.getValue());
entry.setBackup(false);
@@ -341,8 +386,9 @@
return false;
}
- public void memberAdded(Member member) {
+ public void mapMemberAdded(Member member) {
//select a backup node if we don't have one
+ mapMembers.add(member);
synchronized (stateMutex) {
Iterator i = super.entrySet().iterator();
while (i.hasNext()) {
@@ -360,7 +406,12 @@
}//synchronized
}
+ public void memberAdded(Member member) {
+ //do nothing
+ }
+
public void memberDisappeared(Member member) {
+ mapMembers.remove(member);
//todo move all sessions that are primary here to and have this member
as
//a backup
Iterator i = super.entrySet().iterator();
@@ -380,14 +431,13 @@
int currentNode = 0;
public Member getNextBackupNode() {
- Member[] members = channel.getMembers();
- if ( members.length == 0 ) return null;
+ if ( mapMembers.size() == 0 ) return null;
int node = currentNode++;
- if ( node >= members.length ) {
+ if ( node >= mapMembers.size() ) {
node = 0;
currentNode = 0;
}
- return members[node];
+ return (Member)mapMembers.get(node);
}
@@ -411,7 +461,7 @@
//publish the data out to all nodes
MapMessage msg = new MapMessage(this.mapContextName,
MapMessage.MSG_PROXY, false,
(Serializable) key, null, null,
backup);
- channel.send(channel.getMembers(), msg);
+ channel.send((Member[])mapMembers.toArray(new
Member[mapMembers.size()]), msg);
//publish the backup data to one node
msg = new MapMessage(this.mapContextName, MapMessage.MSG_BACKUP, false,
@@ -469,13 +519,14 @@
Object old = null;
//make sure that any old values get removed
- if ( containsKey(key) ) old = (MapEntry)remove(key);
+ if ( containsKey(key) ) old = remove(key);
try {
Member backup = publishEntryInfo(key, value);
entry.setBackupNode(backup);
} catch (ChannelException x) {
log.error("Unable to replicate out data for a
LazyReplicatedMap.put operation", x);
}
+ System.out.println("adding key="+key+" entry="+entry);
super.put(key,entry);
return old;
}
@@ -719,6 +770,8 @@
public static final int MSG_PROXY = 3;
public static final int MSG_REMOVE = 4;
public static final int MSG_STATE = 5;
+ public static final int MSG_START = 6;
+ public static final int MSG_STOP = 7;
private byte[] mapId;
private int msgtype;
@@ -767,6 +820,10 @@
return node;
}
+ private void setBackUpNode(Member node) {
+ this.node = node;
+ }
+
public byte[] getMapId() {
return mapId;
}
@@ -811,6 +868,14 @@
if ( d.length > 0 ) node = McastMember.getMember(d);
break;
}
+ case MSG_START :
+ MSG_STOP :{
+ byte[] d = new byte[in.readInt()];
+ in.read(d);
+ if ( d.length > 0 ) node = McastMember.getMember(d);
+ break;
+ }
+
}//switch
}//readExternal
@@ -845,6 +910,13 @@
}
case MSG_PROXY: {
out.writeObject(key);
+ byte[] d =
node!=null?((McastMember)node).getData(false):new byte[0];
+ out.writeInt(d.length);
+ out.write(d);
+ break;
+ }
+ case MSG_START:
+ MSG_STOP : {
byte[] d =
node!=null?((McastMember)node).getData(false):new byte[0];
out.writeInt(d.length);
out.write(d);
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java?rev=385300&r1=385299&r2=385300&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
Sun Mar 12 07:39:15 2006
@@ -97,6 +97,7 @@
public void messageReceived(Serializable msg, Member sender) {
RpcMessage rmsg = (RpcMessage)msg;
+System.out.println("Received RPC message with message:"+rmsg.message);
RpcCollectorKey key = new RpcCollectorKey(rmsg.uuid);
if ( rmsg.reply ) {
RpcCollector collector = (RpcCollector)responseMap.get(key);
Modified:
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java?rev=385300&r1=385299&r2=385300&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
Sun Mar 12 07:39:15 2006
@@ -65,7 +65,6 @@
}
public void messageReceived(Serializable msg, Member source) {
- System.out.println("Recieved: "+msg);
table.dataModel.getValueAt(-1,-1);
}
@@ -135,7 +134,7 @@
public static class SimpleTableDemo
extends JPanel implements ActionListener{
- private static int WIDTH = 500;
+ private static int WIDTH = 550;
private LazyReplicatedMap map;
private boolean DEBUG = false;
@@ -217,7 +216,7 @@
//create a add value button
JPanel addpanel = new JPanel();
- addpanel.setPreferredSize(new Dimension(WIDTH,20));
+ addpanel.setPreferredSize(new Dimension(WIDTH,30));
addpanel.add(createButton("Add","add"));
addpanel.add(txtAddKey);
addpanel.add(txtAddValue);
@@ -225,7 +224,7 @@
//create a remove value button
JPanel removepanel = new JPanel( );
- removepanel.setPreferredSize(new Dimension(WIDTH,20));
+ removepanel.setPreferredSize(new Dimension(WIDTH,30));
removepanel.add(createButton("Remove","remove"));
removepanel.add(txtRemoveKey);
@@ -236,7 +235,7 @@
changepanel.add(createButton("Change","change"));
changepanel.add(txtChangeKey);
changepanel.add(txtChangeValue);
- changepanel.setPreferredSize(new Dimension(WIDTH,20));
+ changepanel.setPreferredSize(new Dimension(WIDTH,30));
add(changepanel);
@@ -244,7 +243,7 @@
JPanel syncpanel = new JPanel( );
syncpanel.add(createButton("Synchronize","sync"));
syncpanel.add(createButton("Replicate","replicate"));
- syncpanel.setPreferredSize(new Dimension(WIDTH,20));
+ syncpanel.setPreferredSize(new Dimension(WIDTH,30));
add(syncpanel);
@@ -323,6 +322,7 @@
//Display the window.
frame.setSize(450,250);
+ newContentPane.setSize(450,300);
frame.pack();
frame.setVisible(true);
return newContentPane;
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]