Author: fhanik
Date: Wed Mar 22 19:59:38 2006
New Revision: 388020
URL: http://svn.apache.org/viewcvs?rev=388020&view=rev
Log:
more complete state of the replicated map
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ReplicationStream.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/MemberImpl.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReplicationThread.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaRequest.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java?rev=388020&r1=388019&r2=388020&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java
Wed Mar 22 19:59:38 2006
@@ -62,4 +62,8 @@
*/
public long getMemberAliveTime();
+ public boolean isReady();
+ public boolean isSuspect();
+ public boolean isFailing();
+
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=388020&r1=388019&r2=388020&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
Wed Mar 22 19:59:38 2006
@@ -136,7 +136,7 @@
if ( (msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) ==
SEND_OPTIONS_BYTE_MESSAGE ) {
fwd = new ByteMessage(msg.getMessage().getBytes());
} else {
- fwd = XByteBuffer.deserialize(msg.getMessage().getBytes());
+ fwd =
XByteBuffer.deserialize(msg.getMessage().getBytesDirect(),0,msg.getMessage().getLength());
}
//get the actual member with the correct alive time
Member source = msg.getAddress();
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ReplicationStream.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ReplicationStream.java?rev=388020&r1=388019&r2=388020&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ReplicationStream.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ReplicationStream.java
Wed Mar 22 19:59:38 2006
@@ -93,9 +93,7 @@
return clazz;
}
- public Class findExternalClass(String name)
- throws ClassNotFoundException {
-
+ public Class findExternalClass(String name) throws ClassNotFoundException
{
ClassNotFoundException cnfe = null;
for (int i=0; i<classLoaders.length; i++ ) {
try {
@@ -107,6 +105,11 @@
}
if ( cnfe != null ) throw cnfe;
else throw new ClassNotFoundException(name);
+ }
+
+ public void close() throws IOException {
+ this.classLoaders = null;
+ super.close();
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=388020&r1=388019&r2=388020&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
Wed Mar 22 19:59:38 2006
@@ -23,6 +23,7 @@
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
+import java.io.ObjectInputStream;
/**
* The XByteBuffer provides a dual functionality.
@@ -520,18 +521,21 @@
public static Serializable deserialize(byte[] data, int offset, int
length)
throws IOException, ClassNotFoundException, ClassCastException {
- return deserialize(data,offset,length,new ClassLoader[]
{XByteBuffer.class.getClassLoader()});
+ return deserialize(data,offset,length,null);
}
-
+ public static int invokecount = 0;
public static Serializable deserialize(byte[] data, int offset, int
length, ClassLoader[] cls)
throws IOException, ClassNotFoundException, ClassCastException {
+ synchronized (XByteBuffer.class) { invokecount++;}
Object message = null;
- if ( cls == null ) cls = new ClassLoader[]
{XByteBuffer.class.getClassLoader()};
+ if ( cls == null ) cls = new ClassLoader[0];
if (data != null) {
InputStream instream = new
ByteArrayInputStream(data,offset,length);
- ReplicationStream stream = new ReplicationStream(instream,cls);
+ ObjectInputStream stream = null;
+ stream = (cls.length>0)? new ReplicationStream(instream,cls):new
ObjectInputStream(instream);
message = stream.readObject();
instream.close();
+ stream.close();
}
if ( message == null ) {
return null;
@@ -553,6 +557,7 @@
ByteArrayOutputStream outs = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(outs);
out.writeObject(msg);
+ out.flush();
byte[] data = outs.toByteArray();
return data;
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/MemberImpl.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/MemberImpl.java?rev=388020&r1=388019&r2=388020&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/MemberImpl.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/MemberImpl.java
Wed Mar 22 19:59:38 2006
@@ -23,6 +23,7 @@
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.io.XByteBuffer;
import java.util.Arrays;
+import org.apache.catalina.tribes.tcp.SenderState;
/**
* A <b>membership</b> implementation using simple multicast.
@@ -106,6 +107,17 @@
this.port = port;
this.domain = domain.getBytes();
this.memberAliveTime=aliveTime;
+ }
+
+
+ public boolean isReady() {
+ return SenderState.getSenderState(this).isReady();
+ }
+ public boolean isSuspect() {
+ return SenderState.getSenderState(this).isSuspect();
+ }
+ public boolean isFailing() {
+ return SenderState.getSenderState(this).isFailing();
}
/**
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java?rev=388020&r1=388019&r2=388020&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
Wed Mar 22 19:59:38 2006
@@ -46,18 +46,17 @@
protected static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory.getLog(ReceiverBase.class);
private MessageListener listener;
- private String host;
+ private String host = "auto";
private InetAddress bind;
- private int port;
+ private int port = 4000;
private int rxBufSize = 43800;
private int txBufSize = 25188;
private boolean listen = false;
private ThreadPool pool;
private boolean direct = true;
- private long tcpSelectorTimeout;
- private String tcpListenAddress;
+ private long tcpSelectorTimeout = 100;
//how many times to search for an available socket
- private int autoBind = 1;
+ private int autoBind = 10;
private int maxThreads = 25;
private int minThreads = 6;
@@ -233,7 +232,7 @@
}
public String getTcpListenAddress() {
- return tcpListenAddress;
+ return getHost();
}
public int getAutoBind() {
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReplicationThread.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReplicationThread.java?rev=388020&r1=388019&r2=388020&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReplicationThread.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReplicationThread.java
Wed Mar 22 19:59:38 2006
@@ -155,13 +155,18 @@
* This is considered an asynchronized request
*/
if (ClusterData.sendAckAsync(msgs[i].getOptions()))
sendAck(key,channel);
- //process the message
- getCallback().messageDataReceived(msgs[i]);
+ try {
+ //process the message
+ getCallback().messageDataReceived(msgs[i]);
+ }catch ( Exception e ) {
+ log.error("Processing of cluster message failed.",e);
+ }
/**
* Use send ack here if you want the request to complete on
this
* server before sending the ack to the remote server
* This is considered a synchronized request
*/
+
if (ClusterData.sendAckSync(msgs[i].getOptions()))
sendAck(key,channel);
}
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=388020&r1=388019&r2=388020&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
Wed Mar 22 19:59:38 2006
@@ -94,6 +94,7 @@
* @param mapContextName String - unique name for this map, to allow
multiple maps per channel
* @param initialCapacity int - the size of this map, see HashMap
* @param loadFactor float - load factor, see HashMap
+ * @param cls - a list of classloaders to be used for deserialization of
objects.
*/
public AbstractReplicatedMap(Object owner,
Channel channel,
@@ -101,9 +102,10 @@
String mapContextName,
int initialCapacity,
float loadFactor,
- int channelSendOptions) {
+ int channelSendOptions,
+ ClassLoader[] cls) {
super(initialCapacity, loadFactor);
- init(owner, channel, mapContextName, timeout, channelSendOptions);
+ init(owner, channel, mapContextName, timeout, channelSendOptions, cls);
}
@@ -111,9 +113,9 @@
return new Member[] {m};
}
- private void init(Object owner, Channel channel, String mapContextName,
long timeout, int channelSendOptions) {
+ private void init(Object owner, Channel channel, String mapContextName,
long timeout, int channelSendOptions,ClassLoader[] cls) {
this.mapOwner = owner;
-
+ this.externalLoaders = cls;
this.channelSendOptions = channelSendOptions;
this.channel = channel;
this.rpcTimeout = timeout;
@@ -143,7 +145,7 @@
MapMessage msg = new MapMessage(this.mapContextName, msgtype,
false, null, null, null,
wrap(channel.getLocalMember(false)));
if ( rpc) {
- Response[] resp = rpcChannel.send(channel.getMembers(), msg,
rpcChannel.FIRST_REPLY, channelSendOptions,rpcTimeout);
+ Response[] resp = rpcChannel.send(channel.getMembers(), msg,
rpcChannel.FIRST_REPLY, (channelSendOptions),rpcTimeout);
for (int i = 0; i < resp.length; i++) {
mapMemberAdded(resp[i].getSource());
messageReceived(resp[i].getMessage(), resp[i].getSource());
@@ -233,7 +235,7 @@
}
try {
if ( entry.getBackupNodes()!= null &&
entry.getBackupNodes().length > 0 ) {
- channel.send(entry.getBackupNodes(), msg,
channel.SEND_OPTIONS_DEFAULT);
+ channel.send(entry.getBackupNodes(), msg,
channelSendOptions);
}
} catch (ChannelException x) {
log.error("Unable to replicate data.", x);
@@ -266,36 +268,22 @@
null, null, null, null);
Response[] resp = rpcChannel.send(new Member[] {backup}, msg,
rpcChannel.FIRST_REPLY, channelSendOptions, rpcTimeout);
if (resp.length > 0) {
- msg = (MapMessage) resp[0].getMessage();
- ArrayList list = (ArrayList) msg.getValue();
- for (int i = 0; i < list.size(); i++) {
-
messageReceived((Serializable)list.get(i),resp[0].getSource());
-// MapMessage m = (MapMessage) list.get(i);
-// try {
-// m.deserialize(getExternalLoaders());
-// //make sure we don't store that actual object as
primary or backup
-// MapEntry local = (MapEntry)super.get(m.getKey());
-// if (local != null && (!local.isProxy()))
continue;
-//
-// //store the object
-// if (m.getValue()!=null && m.getValue()
instanceof ReplicatedMapEntry ) {
-//
((ReplicatedMapEntry)m.getValue()).setOwner(getMapOwner());
-// }
-// MapEntry entry = new MapEntry(m.getKey(),
m.getValue());
-// entry.setBackup(false);
-// entry.setProxy(true);
-// entry.setBackupNodes(m.getBackupNodes());
-// super.put(entry.getKey(), entry);
-// } catch (IOException x) {
-// log.error("Unable to deserialize MapMessage.",
x);
-// } catch (ClassNotFoundException x) {
-// log.error("Unable to deserialize MapMessage.",
x);
-// }
- }//for
+ synchronized (stateMutex) {
+ msg = (MapMessage) resp[0].getMessage();
+ msg.deserialize(getExternalLoaders());
+ ArrayList list = (ArrayList) msg.getValue();
+ for (int i = 0; i < list.size(); i++) {
+ messageReceived( (Serializable) list.get(i),
resp[0].getSource());
+ } //for
+ }
}
}
} catch (ChannelException x) {
log.error("Unable to transfer LazyReplicatedMap state.", x);
+ } catch (IOException x) {
+ log.error("Unable to transfer LazyReplicatedMap state.", x);
+ } catch (ClassNotFoundException x) {
+ log.error("Unable to transfer LazyReplicatedMap state.", x);
}
stateTransferred = true;
}
@@ -333,12 +321,12 @@
Map.Entry e = (Map.Entry) i.next();
MapEntry entry = (MapEntry) e.getValue();
MapMessage me = new MapMessage(mapContextName,
MapMessage.MSG_PROXY,
- false, (Serializable) entry.getKey(), (Serializable)
entry.getValue(),
- null, entry.getBackupNodes());
+ false, (Serializable) entry.getKey(), null,null,
entry.getBackupNodes());
list.add(me);
}
mapmsg.setValue(list);
return mapmsg;
+
} //synchronized
}
@@ -426,15 +414,15 @@
diff.unlock();
}
} else {
- entry.setValue(mapmsg.getValue());
+ if ( mapmsg.getValue()!=null )
entry.setValue(mapmsg.getValue());
diff.setOwner(getMapOwner());
} //end if
} else if (mapmsg.getValue() instanceof ReplicatedMapEntry) {
ReplicatedMapEntry re =
(ReplicatedMapEntry)mapmsg.getValue();
re.setOwner(getMapOwner());
- entry.setValue(mapmsg.getValue());
+ if ( mapmsg.getValue()!=null )
entry.setValue(mapmsg.getValue());
} else {
- entry.setValue(mapmsg.getValue());
+ if ( mapmsg.getValue()!=null )
entry.setValue(mapmsg.getValue());
} //end if
} //end if
super.put(entry.getKey(), entry);
@@ -459,7 +447,8 @@
while (i.hasNext()) {
Map.Entry e = (Map.Entry) i.next();
MapEntry entry = (MapEntry) e.getValue();
- if (entry.isPrimary() && entry.getBackupNodes() == null &&
entry.getBackupNodes().length == 0) {
+ if ( entry == null ) continue;
+ if (entry.isPrimary() && (entry.getBackupNodes() == null ||
entry.getBackupNodes().length == 0)) {
try {
Member[] backup = publishEntryInfo(entry.getKey(),
entry.getValue());
entry.setBackupNodes(backup);
@@ -473,6 +462,7 @@
}
public boolean inSet(Member m, Member[] set) {
+ if ( set == null ) return false;
boolean result = false;
for (int i=0; i<set.length && (!result); i++ )
if ( m.equals(set[i]) ) result = true;
@@ -484,8 +474,6 @@
}
public void memberDisappeared(Member member) {
- Exception ex = new Exception("[DEBUG] Removing
member:"+member.getName());
- ex.printStackTrace();
synchronized (mapMembers) {
mapMembers.remove(member);
}
@@ -527,7 +515,7 @@
protected void printMap(String header) {
try {
System.out.println("\nDEBUG MAP:"+header);
- System.out.println("Map["+((Object)this).toString()+"; " + new
String(mapContextName, chset) + ", Map Size:" + super.size());
+ System.out.println("Map["+ new String(mapContextName, chset) + ",
Map Size:" + super.size());
Member[] mbrs = getMapMembers();
for ( int i=0; i<mbrs.length;i++ ) {
System.out.println("Mbr["+(i+1)+"="+mbrs[i].getName());
@@ -658,7 +646,7 @@
// map message to send to and from other maps
//------------------------------------------------------------------------------
- public static class MapMessage implements Externalizable {
+ public static class MapMessage implements Serializable {
public static final int MSG_BACKUP = 1;
public static final int MSG_RETRIEVE_BACKUP = 2;
public static final int MSG_PROXY = 3;
@@ -670,8 +658,8 @@
private byte[] mapId;
private int msgtype;
private boolean diff;
- private Serializable key;
- private Serializable value;
+ private transient Serializable key;
+ private transient Serializable value;
private byte[] valuedata;
private byte[] keydata;
private byte[] diffvalue;
@@ -681,7 +669,7 @@
public MapMessage(byte[] mapId,int msgtype, boolean diff,
Serializable key, Serializable value,
- byte[] diffvalue, Member[] nodes) {
+ byte[] diffvalue, Member[] nodes) {
this.mapId = mapId;
this.msgtype = msgtype;
this.diff = diff;
@@ -689,6 +677,8 @@
this.value = value;
this.diffvalue = diffvalue;
this.nodes = nodes;
+ setValue(value);
+ setKey(key);
}
public void deserialize(ClassLoader[] cls) throws IOException,
ClassNotFoundException {
@@ -717,6 +707,7 @@
if ( key!=null ) return key;
if ( keydata == null || keydata.length == 0 ) return null;
key = XByteBuffer.deserialize(keydata,0,keydata.length,cls);
+ keydata = null;
return key;
}
@@ -737,6 +728,7 @@
if ( value!=null ) return value;
if ( valuedata == null || valuedata.length == 0 ) return null;
value = XByteBuffer.deserialize(valuedata,0,valuedata.length,cls);
+ valuedata = null;;
return value;
}
@@ -761,7 +753,21 @@
}
public void setValue(Serializable value) {
- this.value = value;
+ try {
+ if ( value != null ) valuedata = XByteBuffer.serialize(value);
+ this.value = value;
+ }catch ( IOException x ) {
+ throw new RuntimeException(x);
+ }
+ }
+
+ public void setKey(Serializable key) {
+ try {
+ if (key != null) keydata = XByteBuffer.serialize(key);
+ this.key = key;
+ } catch (IOException x) {
+ throw new RuntimeException(x);
+ }
}
protected Member[] readMembers(ObjectInput in) throws IOException,
ClassNotFoundException {
@@ -775,51 +781,6 @@
return members;
}
- protected byte[] readBytes(ObjectInput in) throws IOException {
- byte[] data = new byte[in.readInt()];
- in.read(data);
- return data;
- }
-
- public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
- mapId = new byte[in.readInt()];
- in.read(mapId);
- msgtype = in.readInt();
- switch (msgtype) {
- case MSG_BACKUP:
- case MSG_STATE: {
- diff = in.readBoolean();
- keydata = readBytes(in);
- if (diff) {
- diffvalue = readBytes(in);
- } else {
- valuedata = readBytes(in);
- } //endif
- nodes = readMembers(in);
- break;
- }
- case MSG_RETRIEVE_BACKUP: {
- keydata = readBytes(in);
- valuedata = readBytes(in);
- break;
- }
- case MSG_REMOVE: {
- keydata = readBytes(in);
- break;
- }
- case MSG_PROXY: {
- keydata = readBytes(in);
- this.nodes = readMembers(in);
- break;
- }
- case MSG_START:
- case MSG_STOP: {
- nodes = readMembers(in);
- break;
- }
- } //switch
- } //readExternal
-
protected void writeMembers(ObjectOutput out,Member[] members) throws
IOException {
if ( members == null ) members = new Member[0];
out.writeInt(members.length);
@@ -832,55 +793,6 @@
}
}
- protected void writeBytes(ObjectOutput out, byte[] data) throws
IOException {
- out.writeInt(data.length);
- out.write(data);
- }
-
- protected void writeObject(ObjectOutput out, Serializable o) throws
IOException {
- byte[] data = XByteBuffer.serialize(o);
- writeBytes(out,data);
- }
-
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeInt(mapId.length);
- out.write(mapId);
- out.writeInt(msgtype);
- switch (msgtype) {
- case MSG_BACKUP:
- case MSG_STATE: {
- out.writeBoolean(diff);
- writeObject(out,key);
- if (diff) {
- out.writeInt(diffvalue.length);
- out.write(diffvalue);
- } else {
- writeObject(out,value);
- } //endif
- writeMembers(out,nodes);
- break;
- }
- case MSG_RETRIEVE_BACKUP: {
- writeObject(out,key);
- writeObject(out,value);
- break;
- }
- case MSG_REMOVE: {
- writeObject(out,key);
- break;
- }
- case MSG_PROXY: {
- writeObject(out,key);
- writeMembers(out,nodes);
- break;
- }
- case MSG_START:
- case MSG_STOP: {
- writeMembers(out,nodes);
- break;
- }
- } //switch
- } //writeExternal
/**
* shallow clone
@@ -927,12 +839,20 @@
return externalLoaders;
}
+ public int getChannelSendOptions() {
+ return channelSendOptions;
+ }
+
public void setMapOwner(Object mapOwner) {
this.mapOwner = mapOwner;
}
public void setExternalLoaders(ClassLoader[] externalLoaders) {
this.externalLoaders = externalLoaders;
+ }
+
+ public void setChannelSendOptions(int channelSendOptions) {
+ this.channelSendOptions = channelSendOptions;
}
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=388020&r1=388019&r2=388020&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
Wed Mar 22 19:59:38 2006
@@ -84,8 +84,8 @@
* @param initialCapacity int - the size of this map, see HashMap
* @param loadFactor float - load factor, see HashMap
*/
- public LazyReplicatedMap(Object owner, Channel channel, long timeout,
String mapContextName, int initialCapacity, float loadFactor) {
-
super(owner,channel,timeout,mapContextName,initialCapacity,loadFactor,
Channel.SEND_OPTIONS_DEFAULT);
+ public LazyReplicatedMap(Object owner, Channel channel, long timeout,
String mapContextName, int initialCapacity, float loadFactor, ClassLoader[]
cls) {
+
super(owner,channel,timeout,mapContextName,initialCapacity,loadFactor,
Channel.SEND_OPTIONS_DEFAULT,cls);
}
/**
@@ -95,8 +95,8 @@
* @param mapContextName String - unique name for this map, to allow
multiple maps per channel
* @param initialCapacity int - the size of this map, see HashMap
*/
- public LazyReplicatedMap(Object owner, Channel channel, long timeout,
String mapContextName, int initialCapacity) {
- super(owner, channel,timeout,mapContextName,initialCapacity,
LazyReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT);
+ public LazyReplicatedMap(Object owner, Channel channel, long timeout,
String mapContextName, int initialCapacity, ClassLoader[] cls) {
+ super(owner, channel,timeout,mapContextName,initialCapacity,
LazyReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT, cls);
}
/**
@@ -105,8 +105,8 @@
* @param timeout long - timeout for RPC messags
* @param mapContextName String - unique name for this map, to allow
multiple maps per channel
*/
- public LazyReplicatedMap(Object owner, Channel channel, long timeout,
String mapContextName) {
- super(owner, channel,timeout,mapContextName,
LazyReplicatedMap.DEFAULT_INITIAL_CAPACITY,LazyReplicatedMap.DEFAULT_LOAD_FACTOR,Channel.SEND_OPTIONS_DEFAULT);
+ public LazyReplicatedMap(Object owner, Channel channel, long timeout,
String mapContextName, ClassLoader[] cls) {
+ super(owner, channel,timeout,mapContextName,
LazyReplicatedMap.DEFAULT_INITIAL_CAPACITY,LazyReplicatedMap.DEFAULT_LOAD_FACTOR,Channel.SEND_OPTIONS_DEFAULT,
cls);
}
@@ -166,7 +166,7 @@
ReplicatedMapEntry val =
(ReplicatedMapEntry)entry.getValue();
val.setOwner(getMapOwner());
}
- entry.setValue(msg.getValue());
+ if ( msg.getValue()!=null ) entry.setValue(msg.getValue());
}
if (entry.isBackup()) {
//select a new backup node
@@ -203,7 +203,6 @@
public Object put(Object key, Object value) {
- System.out.println("Adding session id:"+key);
if ( !(key instanceof Serializable) ) throw new
IllegalArgumentException("Key is not serializable:"+key.getClass().getName());
if ( value == null ) return remove(key);
if ( !(value instanceof Serializable) ) throw new
IllegalArgumentException("Value is not
serializable:"+value.getClass().getName());
@@ -247,8 +246,9 @@
*/
public Object remove(Object key) {
MapEntry entry = (MapEntry)super.remove(key);
- MapMessage msg = new
MapMessage(getMapContextName(),MapMessage.MSG_REMOVE,false,(Serializable)key,null,null,null);
+
try {
+ MapMessage msg = new
MapMessage(getMapContextName(),MapMessage.MSG_REMOVE,false,(Serializable)key,null,null,null);
getChannel().send(getMapMembers(),
msg,Channel.SEND_OPTIONS_DEFAULT);
} catch ( ChannelException x ) {
log.error("Unable to replicate out data for a
LazyReplicatedMap.remove operation",x);
@@ -294,6 +294,10 @@
return super.keySet();
}
+ public int sizeFull() {
+ return super.size();
+ }
+
public Set entrySet() {
LinkedHashSet set = new LinkedHashSet(super.size());
Iterator i = super.entrySet().iterator();
@@ -318,9 +322,6 @@
return Collections.unmodifiableSet(set);
}
- public int sizeFull() {
- return super.size();
- }
public int size() {
//todo, implement a counter variable instead
@@ -330,7 +331,7 @@
while ( i.hasNext() ) {
Map.Entry e = (Map.Entry)i.next();
MapEntry entry = (MapEntry)e.getValue();
- if ( entry.isPrimary() ) counter++;
+ if ( entry.isPrimary() && entry.getValue()!=null ) counter++;
}
return counter;
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java?rev=388020&r1=388019&r2=388020&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java
Wed Mar 22 19:59:38 2006
@@ -50,6 +50,9 @@
* @todo implement periodic sync/transfer thread
* @author Filip Hanik
* @version 1.0
+ *
+ * @todo memberDisappeared, should do nothing except change map membership
+ * by default it relocates the primary objects
*/
public class ReplicatedMap extends AbstractReplicatedMap implements
RpcCallback, ChannelListener, MembershipListener {
@@ -66,8 +69,8 @@
* @param initialCapacity int - the size of this map, see HashMap
* @param loadFactor float - load factor, see HashMap
*/
- public ReplicatedMap(Object owner, Channel channel, long timeout, String
mapContextName, int initialCapacity,float loadFactor) {
- super(owner,channel, timeout, mapContextName, initialCapacity,
loadFactor, Channel.SEND_OPTIONS_DEFAULT);
+ public ReplicatedMap(Object owner, Channel channel, long timeout, String
mapContextName, int initialCapacity,float loadFactor, ClassLoader[] cls) {
+ super(owner,channel, timeout, mapContextName, initialCapacity,
loadFactor, Channel.SEND_OPTIONS_DEFAULT, cls);
}
/**
@@ -77,8 +80,8 @@
* @param mapContextName String - unique name for this map, to allow
multiple maps per channel
* @param initialCapacity int - the size of this map, see HashMap
*/
- public ReplicatedMap(Object owner, Channel channel, long timeout, String
mapContextName, int initialCapacity) {
- super(owner,channel, timeout, mapContextName, initialCapacity,
AbstractReplicatedMap.DEFAULT_LOAD_FACTOR,Channel.SEND_OPTIONS_DEFAULT);
+ public ReplicatedMap(Object owner, Channel channel, long timeout, String
mapContextName, int initialCapacity, ClassLoader[] cls) {
+ super(owner,channel, timeout, mapContextName, initialCapacity,
AbstractReplicatedMap.DEFAULT_LOAD_FACTOR,Channel.SEND_OPTIONS_DEFAULT, cls);
}
/**
@@ -87,8 +90,8 @@
* @param timeout long - timeout for RPC messags
* @param mapContextName String - unique name for this map, to allow
multiple maps per channel
*/
- public ReplicatedMap(Object owner, Channel channel, long timeout, String
mapContextName) {
- super(owner, channel, timeout,
mapContextName,AbstractReplicatedMap.DEFAULT_INITIAL_CAPACITY,
AbstractReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT);
+ public ReplicatedMap(Object owner, Channel channel, long timeout, String
mapContextName, ClassLoader[] cls) {
+ super(owner, channel, timeout,
mapContextName,AbstractReplicatedMap.DEFAULT_INITIAL_CAPACITY,
AbstractReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT, cls);
}
//------------------------------------------------------------------------------
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java?rev=388020&r1=388019&r2=388020&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
Wed Mar 22 19:59:38 2006
@@ -79,6 +79,10 @@
long timeout) throws ChannelException {
if ( destination==null || destination.length == 0 ) return new
Response[0];
+
+ //avoid dead lock
+ channelOptions = channelOptions &
~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK;
+
RpcCollectorKey key = new
RpcCollectorKey(UUIDGenerator.randomUUID(false));
RpcCollector collector = new
RpcCollector(key,rpcOptions,destination.length,timeout);
try {
Modified:
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java?rev=388020&r1=388019&r2=388020&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
Wed Mar 22 19:59:38 2006
@@ -244,7 +244,8 @@
- public static class LoadMessage extends ByteMessage implements
Serializable {
+ //public static class LoadMessage extends ByteMessage implements
Serializable {
+ public static class LoadMessage implements Serializable {
public static byte[] outdata = new byte[size];
public static Random r = new Random(System.currentTimeMillis());
Modified:
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java?rev=388020&r1=388019&r2=388020&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
Wed Mar 22 19:59:38 2006
@@ -45,10 +45,14 @@
protected SimpleTableDemo table;
public MapDemo(Channel channel ) {
- map = new LazyReplicatedMap(null,channel,5000, "MapDemo");
+ map = new LazyReplicatedMap(null,channel,5000, "MapDemo",null);
table =
SimpleTableDemo.createAndShowGUI(map,channel.getLocalMember(false).getName());
channel.addChannelListener(this);
channel.addMembershipListener(this);
+ for ( int i=0; i<1000; i++ ) {
+ map.put("MyKey-"+i,"My String Value-"+i);
+ }
+ this.messageReceived(null,null);
}
public boolean accept(Serializable msg, Member source) {
Modified:
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java?rev=388020&r1=388019&r2=388020&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java
(original)
+++
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java
Wed Mar 22 19:59:38 2006
@@ -28,6 +28,7 @@
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.io.ReplicationStream;
import org.apache.catalina.tribes.tipis.LazyReplicatedMap;
+import org.apache.catalina.tribes.Channel;
/**
[EMAIL PROTECTED] Filip Hanik
@@ -56,7 +57,10 @@
* Should listeners be notified?
*/
private boolean notifyListenersOnReplication;
-
+ /**
+ *
+ */
+ private int mapSendOptions =
Channel.SEND_OPTIONS_ASYNCHRONOUS|Channel.SEND_OPTIONS_SYNCHRONIZED_ACK|Channel.SEND_OPTIONS_USE_ACK;
/**
* Constructor, just calls super()
@@ -139,7 +143,7 @@
ClassLoader classLoader = null;
if (container != null) loader = container.getLoader();
if (loader != null) classLoader = loader.getClassLoader();
- else classLoader = Thread.currentThread().getContextClassLoader();
+ if ( classLoader == null ) classLoader =
Thread.currentThread().getContextClassLoader();
if ( classLoader == Thread.currentThread().getContextClassLoader() ) {
return new ClassLoader[] {classLoader};
} else {
@@ -189,8 +193,9 @@
LazyReplicatedMap map = new LazyReplicatedMap(this,
catclust.getChannel(),
DEFAULT_REPL_TIMEOUT,
- getMapName());
- map.setExternalLoaders(getClassLoaders());
+ getMapName(),
+ getClassLoaders());
+ map.setChannelSendOptions(mapSendOptions);
this.sessions = map;
super.start();
} catch ( Exception x ) {
@@ -246,6 +251,9 @@
this.notifyListenersOnReplication = notifyListenersOnReplication;
}
+ public void setMapSendOptions(int mapSendOptions) {
+ this.mapSendOptions = mapSendOptions;
+ }
/*
* @see org.apache.catalina.ha.ClusterManager#getCluster()
@@ -253,7 +261,11 @@
public CatalinaCluster getCluster() {
return cluster;
}
-
+
+ public int getMapSendOptions() {
+ return mapSendOptions;
+ }
+
public String[] getInvalidatedSessions() {
return new String[0];
}
Modified:
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaRequest.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaRequest.java?rev=388020&r1=388019&r2=388020&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaRequest.java
(original)
+++
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaRequest.java
Wed Mar 22 19:59:38 2006
@@ -159,16 +159,22 @@
switch ( info.getType() ) {
case TYPE_ATTRIBUTE: {
if ( info.getAction() == ACTION_SET ) {
+ if ( log.isTraceEnabled() )
log.trace("Session.setAttribute('"+info.getName()+"', '"+info.getValue()+"')");
session.setAttribute(info.getName(),
info.getValue(),notifyListeners,false);
- } else
+ } else {
+ if ( log.isTraceEnabled() )
log.trace("Session.removeAttribute('"+info.getName()+"')");
session.removeAttribute(info.getName(),notifyListeners,false);
+ }
+
break;
}//case
case TYPE_ISNEW: {
+ if ( log.isTraceEnabled() )
log.trace("Session.setNew('"+info.getValue()+"')");
session.setNew(((Boolean)info.getValue()).booleanValue(),false);
break;
}//case
case TYPE_MAXINTERVAL: {
+ if ( log.isTraceEnabled() )
log.trace("Session.setMaxInactiveInterval('"+info.getValue()+"')");
session.setMaxInactiveInterval(((Integer)info.getValue()).intValue(),false);
break;
}//case
@@ -341,28 +347,30 @@
return other.getName().equals(this.getName());
}
- public synchronized void readExternal(java.io.ObjectInput in ) throws
java.io.IOException,
- java.lang.ClassNotFoundException {
+ public synchronized void readExternal(java.io.ObjectInput in ) throws
IOException,ClassNotFoundException {
//type - int
//action - int
//name - String
+ //hasvalue - boolean
//value - object
type = in.readInt();
action = in.readInt();
name = in.readUTF();
- value = in.readObject();
+ boolean hasValue = in.readBoolean();
+ if ( hasValue ) value = in.readObject();
}
- public synchronized void writeExternal(java.io.ObjectOutput out)
throws java.io.
- IOException {
+ public synchronized void writeExternal(java.io.ObjectOutput out)
throws IOException {
//type - int
//action - int
//name - String
+ //hasvalue - boolean
//value - object
out.writeInt(getType());
out.writeInt(getAction());
out.writeUTF(getName());
- out.writeObject(getValue());
+ out.writeBoolean(getValue()!=null);
+ if (getValue()!=null) out.writeObject(getValue());
}
public String toString() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]