Author: fhanik
Date: Fri Mar 10 16:58:26 2006
New Revision: 384974
URL: http://svn.apache.org/viewcvs?rev=384974&view=rev
Log:
Completed the LazyReplcatedMap and a demo that proves its concept
Added:
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=384974&r1=384973&r2=384974&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
Fri Mar 10 16:58:26 2006
@@ -93,7 +93,8 @@
public void send(Member[] destination, Serializable msg) throws
ChannelException {
if ( msg == null ) return;
try {
- if ( destination == null ) destination = getMembers();
+ if ( destination == null ) throw new ChannelException("No
destination given");
+ if ( destination.length == 0 ) return;
int options = 0;
ClusterData data = new ClusterData();//generates a unique Id
data.setAddress(getLocalMember());
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=384974&r1=384973&r2=384974&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
Fri Mar 10 16:58:26 2006
@@ -58,6 +58,7 @@
private transient RpcChannel rpcChannel;
private transient byte[] mapContextName;
private transient boolean stateTransferred = false;
+ private transient Object stateMutex = new Object();
//------------------------------------------------------------------------------
@@ -184,6 +185,12 @@
ArrayList list = (ArrayList)msg.getValue();
for (int i=0; i<list.size(); i++ ) {
MapMessage m = (MapMessage)list.get(i);
+
+ //make sure we don't store that actual object as
primary or backup
+ MapEntry local = (MapEntry)super.get(m.getKey());
+ if ( local != null && (!local.isProxy() ) ) continue;
+
+ //store the object
MapEntry entry = new MapEntry(m.getKey(),m.getValue());
entry.setBackup(false);
entry.setProxy(true);
@@ -217,18 +224,20 @@
//state transfer request
if ( mapmsg.getMsgType() == mapmsg.MSG_STATE ) {
- ArrayList list = new ArrayList();
- Iterator i = super.entrySet().iterator();
- while (i.hasNext()) {
- Map.Entry e = (Map.Entry) i.next();
- MapEntry entry = (MapEntry) e.getValue();
- MapMessage me = new
MapMessage(mapContextName,MapMessage.MSG_PROXY,
-
false,(Serializable)entry.getKey(),(Serializable)entry.getValue(),
- null,entry.getBackupNode());
- list.add(me);
- }
- mapmsg.setValue(list);
- return mapmsg;
+ synchronized (stateMutex) { //make sure we dont do two things at
the same time
+ ArrayList list = new ArrayList();
+ Iterator i = super.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry e = (Map.Entry) i.next();
+ MapEntry entry = (MapEntry) e.getValue();
+ MapMessage me = new MapMessage(mapContextName,
MapMessage.MSG_PROXY,
+ false, (Serializable) entry.getKey(), (Serializable)
entry.getValue(),
+ null, entry.getBackupNode());
+ list.add(me);
+ }
+ mapmsg.setValue(list);
+ return mapmsg;
+ }//synchronized
}
return null;
@@ -259,6 +268,11 @@
super.put(entry.getKey(),entry);
}
+ if ( mapmsg.getMsgType() == MapMessage.MSG_REMOVE ) {
+ super.remove(mapmsg.getKey());
+ }
+
+
if ( mapmsg.getMsgType() == MapMessage.MSG_BACKUP ) {
MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
if ( entry == null ) {
@@ -268,6 +282,9 @@
entry.setBackupNode(mapmsg.getBackupNode());
super.put(entry.getKey(), entry);
} else {
+ entry.setBackup(true);
+ entry.setProxy(false);
+ entry.setBackupNode(mapmsg.getBackupNode());
if ( entry.getValue() instanceof ReplicatedMapEntry ) {
ReplicatedMapEntry diff =
(ReplicatedMapEntry)entry.getValue();
if ( mapmsg.isDiff() ) {
@@ -295,7 +312,23 @@
}
public void memberAdded(Member member) {
- //do nothing, we don't care
+ //select a backup node if we don't have one
+ synchronized (stateMutex) {
+ Iterator i = super.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry e = (Map.Entry) i.next();
+ MapEntry entry = (MapEntry) e.getValue();
+ if (entry.isPrimary() && entry.getBackupNode() == null) {
+ try {
+ Member backup = publishEntryInfo(entry.getKey(),
entry.getValue());
+ entry.setBackupNode(backup);
+ } catch (ChannelException x) {
+ log.error("Unable to select backup node.", x);
+ }//catch
+ }//end if
+ } //while
+ }//synchronized
+
}
public void memberDisappeared(Member member) {
//todo move all sessions that are primary here to and have this member
as
@@ -342,6 +375,9 @@
protected Member publishEntryInfo(Object key, Object value) throws
ChannelException {
//select a backup node
Member backup = getNextBackupNode();
+
+ if ( backup == null ) return null;
+
//publish the data out to all nodes
MapMessage msg = new MapMessage(this.mapContextName,
MapMessage.MSG_PROXY, false,
(Serializable) key, null, null,
backup);
@@ -497,6 +533,10 @@
return set;
}
+ public int sizeFull() {
+ return super.size();
+ }
+
public int size() {
//todo, implement a counter variable instead
//only count active members in this node
@@ -658,6 +698,8 @@
private byte[] diffvalue;
private Member node;
+ public MapMessage(){}
+
public MapMessage(byte[] mapId,
int msgtype, boolean diff,
Serializable key,Serializable value,
@@ -718,9 +760,16 @@
} else {
value = (Serializable)in.readObject();
}//endif
+ byte[] d = new byte[in.readInt()];
+ in.read(d);
+ if ( d.length > 0 ) node = McastMember.getMember(d);
+ break;
+ }
+ case MSG_RETRIEVE_BACKUP: {
+ key = (Serializable)in.readObject();
+ value = (Serializable)in.readObject();
break;
}
- case MSG_RETRIEVE_BACKUP:
case MSG_REMOVE : {
key = (Serializable)in.readObject();
break;
@@ -729,7 +778,7 @@
key = (Serializable)in.readObject();
byte[] d = new byte[in.readInt()];
in.read(d);
- node = McastMember.getMember(d);
+ if ( d.length > 0 ) node = McastMember.getMember(d);
break;
}
}//switch
@@ -750,16 +799,23 @@
} else {
out.writeObject(value);
}//endif
+ byte[] d =
node!=null?((McastMember)node).getData(false):new byte[0];
+ out.writeInt(d.length);
+ out.write(d);
+ break;
+ }
+ case MSG_RETRIEVE_BACKUP:{
+ out.writeObject(key);
+ out.writeObject(value);
break;
}
- case MSG_RETRIEVE_BACKUP:
case MSG_REMOVE : {
out.writeObject(key);
break;
}
case MSG_PROXY: {
out.writeObject(key);
- byte[] d = ((McastMember)node).getData(false);
+ byte[] d =
node!=null?((McastMember)node).getData(false):new byte[0];
out.writeInt(d.length);
out.write(d);
break;
Added:
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java?rev=384974&view=auto
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
(added)
+++
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
Fri Mar 10 16:58:26 2006
@@ -0,0 +1,332 @@
+package org.apache.catalina.tribes.demos;
+
+import java.io.Serializable;
+
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.tipis.RpcCallback;
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ManagedChannel;
+import org.apache.catalina.tribes.tipis.RpcChannel;
+import org.apache.catalina.tribes.tipis.Response;
+
+import javax.swing.JFrame;
+import javax.swing.JPanel;
+import javax.swing.JScrollPane;
+import javax.swing.JTable;
+import java.awt.Dimension;
+import java.awt.GridLayout;
+import java.awt.event.MouseAdapter;
+import java.awt.event.MouseEvent;
+import javax.swing.JButton;
+import javax.swing.JTextField;
+import java.awt.Panel;
+import javax.swing.BoxLayout;
+import java.awt.ComponentOrientation;
+import javax.swing.table.TableModel;
+import javax.swing.table.AbstractTableModel;
+import javax.swing.table.TableColumnModel;
+import javax.swing.table.DefaultTableColumnModel;
+import javax.swing.table.TableColumn;
+import java.awt.event.ActionEvent;
+import java.awt.event.ActionListener;
+import org.apache.catalina.tribes.tipis.LazyReplicatedMap;
+import org.apache.catalina.tribes.MessageListener;
+import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.MembershipListener;
+import java.util.Map;
+import java.awt.BorderLayout;
+
+/**
+ * <p>Title: </p>
+ *
+ * <p>Description: </p>
+ *
+ * <p>Copyright: Copyright (c) 2005</p>
+ *
+ * <p>Company: </p>
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class MapDemo implements ChannelListener, MembershipListener{
+
+ protected LazyReplicatedMap map;
+ protected SimpleTableDemo table;
+
+ public MapDemo(Channel channel ) {
+ map = new LazyReplicatedMap(channel,"MapDemo");
+ table =
SimpleTableDemo.createAndShowGUI(map,channel.getLocalMember().getName());
+ channel.addChannelListener(this);
+ channel.addMembershipListener(this);
+ }
+
+ public boolean accept(Serializable msg, Member source) {
+ return true;
+ }
+
+ public void messageReceived(Serializable msg, Member source) {
+ System.out.println("Recieved: "+msg);
+ table.dataModel.getValueAt(-1,-1);
+ }
+
+ public void memberAdded(Member member) {
+ }
+ public void memberDisappeared(Member member) {
+ table.dataModel.getValueAt(-1,-1);
+ }
+
+ public static void usage() {
+ System.out.println("Tribes MapDemo.");
+ System.out.println("Usage:\n\t" +
+ "java MapDemo [channel options]\n\t" +
+ "\tChannel options:" +
+ ChannelCreator.usage());
+ }
+
+ public static void main(String[] args) throws Exception {
+ ManagedChannel channel = (ManagedChannel)
ChannelCreator.createChannel(args);
+ channel.start(channel.DEFAULT);
+ Runtime.getRuntime().addShutdownHook(new Shutdown(channel));
+ MapDemo demo = new MapDemo(channel);
+
+ System.out.println("System test complete, sleeping to let threads
finish.");
+ Thread.sleep(60 * 1000 * 60);
+ }
+
+ public static class Shutdown
+ extends Thread {
+ ManagedChannel channel = null;
+ public Shutdown(ManagedChannel channel) {
+ this.channel = channel;
+ }
+
+ public void run() {
+ System.out.println("Shutting down...");
+ SystemExit exit = new SystemExit(5000);
+ exit.setDaemon(true);
+ exit.start();
+ try {
+ channel.stop(channel.DEFAULT);
+
+ } catch (Exception x) {
+ x.printStackTrace();
+ }
+ System.out.println("Channel stopped.");
+ }
+ }
+
+ public static class SystemExit
+ extends Thread {
+ private long delay;
+ public SystemExit(long delay) {
+ this.delay = delay;
+ }
+
+ public void run() {
+ try {
+ Thread.sleep(delay);
+ } catch (Exception x) {
+ x.printStackTrace();
+ }
+ System.exit(0);
+
+ }
+ }
+
+ public static class SimpleTableDemo
+ extends JPanel implements ActionListener{
+ private static int WIDTH = 500;
+
+ private LazyReplicatedMap map;
+ private boolean DEBUG = false;
+ TableModel dataModel = new AbstractTableModel() {
+
+
+ String[] columnNames = {
+ "Key",
+ "Value",
+ "Backup Node",
+ "isPrimary",
+ "isProxy",
+ "isBackup"};
+
+ public int getColumnCount() { return columnNames.length; }
+
+ public int getRowCount() {return map.sizeFull() +1; }
+
+ public Object getValueAt(int row, int col) {
+ if ( row==-1 ) {
+ update();
+ return "";
+ }
+ if ( row == 0 ) return columnNames[col];
+ Object[] entries = map.entrySetFull().toArray();
+
+ Map.Entry e = (Map.Entry)entries [row-1];
+ LazyReplicatedMap.MapEntry entry =
(LazyReplicatedMap.MapEntry)e.getValue();
+ switch (col) {
+ case 0: return entry.getKey();
+ case 1: return entry.getValue();
+ case 2: return
entry.getBackupNode()!=null?entry.getBackupNode().getName():"";
+ case 3: return new Boolean(entry.isPrimary());
+ case 4: return new Boolean(entry.isProxy());
+ case 5: return new Boolean(entry.isBackup());
+ default: return "";
+ }
+ }
+
+ public void update() {
+ fireTableDataChanged();
+ }
+ };
+
+ JTextField txtAddKey = new JTextField(20);
+ JTextField txtAddValue = new JTextField(20);
+ JTextField txtRemoveKey = new JTextField(20);
+ JTextField txtChangeKey = new JTextField(20);
+ JTextField txtChangeValue = new JTextField(20);
+
+
+ public SimpleTableDemo(LazyReplicatedMap map) {
+ super();
+ this.map = map;
+
+ this.setComponentOrientation(ComponentOrientation.LEFT_TO_RIGHT);
+
+ //final JTable table = new JTable(data, columnNames);
+ final JTable table = new JTable(dataModel);
+
+ table.setPreferredScrollableViewportSize(new Dimension(WIDTH,
150));
+
+ if (DEBUG) {
+ table.addMouseListener(new MouseAdapter() {
+ public void mouseClicked(MouseEvent e) {
+ printDebugData(table);
+ }
+ });
+ }
+
+ //setLayout(new GridLayout(5, 0));
+ setLayout(new BoxLayout(this, BoxLayout.Y_AXIS));
+
+ //Create the scroll pane and add the table to it.
+ JScrollPane scrollPane = new JScrollPane(table);
+
+ //Add the scroll pane to this panel.
+ add(scrollPane);
+
+ //create a add value button
+ JPanel addpanel = new JPanel();
+ addpanel.setPreferredSize(new Dimension(WIDTH,20));
+ addpanel.add(createButton("Add","add"));
+ addpanel.add(txtAddKey);
+ addpanel.add(txtAddValue);
+ add(addpanel);
+
+ //create a remove value button
+ JPanel removepanel = new JPanel( );
+ removepanel.setPreferredSize(new Dimension(WIDTH,20));
+ removepanel.add(createButton("Remove","remove"));
+ removepanel.add(txtRemoveKey);
+
+ add(removepanel);
+
+ //create a change value button
+ JPanel changepanel = new JPanel( );
+ changepanel.add(createButton("Change","change"));
+ changepanel.add(txtChangeKey);
+ changepanel.add(txtChangeValue);
+ changepanel.setPreferredSize(new Dimension(WIDTH,20));
+
+ add(changepanel);
+
+ //create sync button
+ JPanel syncpanel = new JPanel( );
+ syncpanel.add(createButton("Synchronize","sync"));
+ syncpanel.add(createButton("Replicate","replicate"));
+ syncpanel.setPreferredSize(new Dimension(WIDTH,20));
+
+ add(syncpanel);
+
+
+ }
+
+ public JButton createButton(String text, String command) {
+ JButton button = new JButton(text);
+ button.setActionCommand(command);
+ button.addActionListener(this);
+ return button;
+ }
+
+ public void actionPerformed(ActionEvent e) {
+ System.out.println(e.getActionCommand());
+ if ( "add".equals(e.getActionCommand()) ) {
+ System.out.println("Add key:"+txtAddKey.getText()+"
value:"+txtAddValue.getText());
+ map.put(txtAddKey.getText(),new
StringBuffer(txtAddValue.getText()));
+ }
+ if ( "change".equals(e.getActionCommand()) ) {
+ System.out.println("Change key:"+txtChangeKey.getText()+"
value:"+txtChangeValue.getText());
+ StringBuffer buf =
(StringBuffer)map.get(txtChangeKey.getText());
+ if ( buf!=null ) {
+ buf.delete(0,buf.length());
+ buf.append(txtChangeValue.getText());
+ }
+ }
+ if ( "remove".equals(e.getActionCommand()) ) {
+ System.out.println("Remove key:"+txtRemoveKey.getText());
+ map.remove(txtRemoveKey.getText());
+ }
+ if ( "sync".equals(e.getActionCommand()) ) {
+ System.out.println("Syncing from another node.");
+ map.transferState();
+ }
+ if ( "replicate".equals(e.getActionCommand()) ) {
+ System.out.println("Replicating out to the other nodes.");
+ map.replicate(true);
+ }
+ dataModel.getValueAt(-1,-1);
+ }
+
+ private void printDebugData(JTable table) {
+ int numRows = table.getRowCount();
+ int numCols = table.getColumnCount();
+ javax.swing.table.TableModel model = table.getModel();
+
+ System.out.println("Value of data: ");
+ for (int i = 0; i < numRows; i++) {
+ System.out.print(" row " + i + ":");
+ for (int j = 0; j < numCols; j++) {
+ System.out.print(" " + model.getValueAt(i, j));
+ }
+ System.out.println();
+ }
+ System.out.println("--------------------------");
+ }
+
+ /**
+ * Create the GUI and show it. For thread safety,
+ * this method should be invoked from the
+ * event-dispatching thread.
+ */
+ public static SimpleTableDemo createAndShowGUI(LazyReplicatedMap map,
String title) {
+ //Make sure we have nice window decorations.
+ JFrame.setDefaultLookAndFeelDecorated(true);
+
+ //Create and set up the window.
+ JFrame frame = new JFrame("SimpleTableDemo - "+title);
+ frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
+
+ //Create and set up the content pane.
+ SimpleTableDemo newContentPane = new SimpleTableDemo(map);
+ newContentPane.setOpaque(true); //content panes must be opaque
+ frame.setContentPane(newContentPane);
+
+ //Display the window.
+ frame.setSize(450,250);
+ frame.pack();
+ frame.setVisible(true);
+ return newContentPane;
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]