Author: fhanik
Date: Wed Mar 22 07:14:44 2006
New Revision: 387872
URL: http://svn.apache.org/viewcvs?rev=387872&view=rev
Log:
ok, we finally got control of it, fully working now
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ReplicationStream.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/ClusterManagerBase.java
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaSession.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ReplicationStream.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ReplicationStream.java?rev=387872&r1=387871&r2=387872&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ReplicationStream.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ReplicationStream.java
Wed Mar 22 07:14:44 2006
@@ -17,11 +17,10 @@
package org.apache.catalina.tribes.io;
-import java.io.InputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectStreamClass;
-import java.util.HashMap;
/**
* Custom subclass of <code>ObjectInputStream</code> that loads from the
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=387872&r1=387871&r2=387872&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
Wed Mar 22 07:14:44 2006
@@ -518,12 +518,18 @@
return deserialize(data,0,data.length);
}
- public static Serializable deserialize(byte[] data, int offset, int
length)
- throws IOException, ClassNotFoundException, ClassCastException {
+ public static Serializable deserialize(byte[] data, int offset, int
length)
+ throws IOException, ClassNotFoundException, ClassCastException {
+ return deserialize(data,offset,length,new ClassLoader[]
{XByteBuffer.class.getClassLoader()});
+ }
+
+ public static Serializable deserialize(byte[] data, int offset, int
length, ClassLoader[] cls)
+ throws IOException, ClassNotFoundException, ClassCastException {
Object message = null;
+ if ( cls == null ) cls = new ClassLoader[]
{XByteBuffer.class.getClassLoader()};
if (data != null) {
InputStream instream = new
ByteArrayInputStream(data,offset,length);
- ReplicationStream stream = new ReplicationStream(instream,new
ClassLoader[] {XByteBuffer.class.getClassLoader()});
+ ReplicationStream stream = new ReplicationStream(instream,cls);
message = stream.readObject();
instream.close();
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=387872&r1=387871&r2=387872&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
Wed Mar 22 07:14:44 2006
@@ -20,7 +20,6 @@
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
@@ -34,7 +33,6 @@
import org.apache.catalina.tribes.ChannelListener;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
-import org.apache.catalina.tribes.io.DirectByteArrayOutputStream;
import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.mcast.MemberImpl;
import org.apache.commons.logging.Log;
@@ -81,7 +79,9 @@
private transient Object stateMutex = new Object();
private transient ArrayList mapMembers = new ArrayList();
private transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT;
- private transient MapOwner mapOwner;
+ private transient Object mapOwner;
+ private transient ClassLoader[] externalLoaders;
+
//------------------------------------------------------------------------------
// CONSTRUCTORS
@@ -95,7 +95,7 @@
* @param initialCapacity int - the size of this map, see HashMap
* @param loadFactor float - load factor, see HashMap
*/
- public AbstractReplicatedMap(MapOwner owner,
+ public AbstractReplicatedMap(Object owner,
Channel channel,
long timeout,
String mapContextName,
@@ -111,7 +111,7 @@
return new Member[] {m};
}
- private void init(MapOwner owner, Channel channel, String mapContextName,
long timeout, int channelSendOptions) {
+ private void init(Object owner, Channel channel, String mapContextName,
long timeout, int channelSendOptions) {
this.mapOwner = owner;
this.channelSendOptions = channelSendOptions;
@@ -131,22 +131,29 @@
this.channel.addChannelListener(this);
this.channel.addMembershipListener(this);
+ broadcast(MapMessage.MSG_START,true);
+
+ //transfer state from another map
+ transferState();
+ }
+
+ private void broadcast(int msgtype, boolean rpc) {
try {
//send out a map membership message, only wait for the first reply
- MapMessage msg = new MapMessage(this.mapContextName,
MapMessage.MSG_START,
+ MapMessage msg = new MapMessage(this.mapContextName, msgtype,
false, null, null, null,
wrap(channel.getLocalMember(false)));
- Response[] resp = rpcChannel.send(channel.getMembers(), msg,
rpcChannel.FIRST_REPLY, channelSendOptions, timeout);
- for (int i = 0; i < resp.length; i++) {
- mapMemberAdded(resp[i].getSource());
- messageReceived(resp[i].getMessage(), resp[i].getSource());
+ if ( rpc) {
+ Response[] resp = rpcChannel.send(channel.getMembers(), msg,
rpcChannel.FIRST_REPLY, channelSendOptions,rpcTimeout);
+ for (int i = 0; i < resp.length; i++) {
+ mapMemberAdded(resp[i].getSource());
+ messageReceived(resp[i].getMessage(), resp[i].getSource());
+ }
+ } else {
+ channel.send(channel.getMembers(),msg,channelSendOptions);
}
} catch (ChannelException x) {
log.warn("Unable to send map start message.");
}
-
- //transfer state from another map
- transferState();
- printMap();
}
public void breakdown() {
@@ -154,16 +161,7 @@
}
public void finalize() {
- try {
- //send a map membership stop message
- MapMessage msg = new MapMessage(this.mapContextName,
MapMessage.MSG_STOP,
- false, null, null, null,
wrap(channel.getLocalMember(false)));
- if (channel != null) channel.send(channel.getMembers(),
msg,channel.SEND_OPTIONS_DEFAULT);
-
- } catch (ChannelException x) {
- log.warn("Unable to send stop message.", x);
- }
-
+ broadcast(MapMessage.MSG_STOP,false);
//cleanup
if (this.rpcChannel != null) {
this.rpcChannel.breakdown();
@@ -177,9 +175,10 @@
this.mapMembers.clear();
super.clear();
this.stateTransferred = false;
+ this.externalLoaders = null;
}
-
//------------------------------------------------------------------------------
+//------------------------------------------------------------------------------
// GROUP COM INTERFACES
//------------------------------------------------------------------------------
public Member[] getMapMembers() {
@@ -270,22 +269,29 @@
msg = (MapMessage) resp[0].getMessage();
ArrayList list = (ArrayList) msg.getValue();
for (int i = 0; i < list.size(); i++) {
- MapMessage m = (MapMessage) list.get(i);
-
- //make sure we don't store that actual object as
primary or backup
- MapEntry local = (MapEntry)super.get(m.getKey());
- if (local != null && (!local.isProxy())) continue;
-
- //store the object
- if (m.getValue()!=null && m.getValue() instanceof
ReplicatedMapEntry ) {
-
((ReplicatedMapEntry)m.getValue()).setOwner(getMapOwner());
- }
- MapEntry entry = new MapEntry(m.getKey(),
m.getValue());
- entry.setBackup(false);
- entry.setProxy(true);
- entry.setBackupNodes(m.getBackupNodes());
- super.put(entry.getKey(), entry);
- }
+
messageReceived((Serializable)list.get(i),resp[0].getSource());
+// MapMessage m = (MapMessage) list.get(i);
+// try {
+// m.deserialize(getExternalLoaders());
+// //make sure we don't store that actual object as
primary or backup
+// MapEntry local = (MapEntry)super.get(m.getKey());
+// if (local != null && (!local.isProxy()))
continue;
+//
+// //store the object
+// if (m.getValue()!=null && m.getValue()
instanceof ReplicatedMapEntry ) {
+//
((ReplicatedMapEntry)m.getValue()).setOwner(getMapOwner());
+// }
+// MapEntry entry = new MapEntry(m.getKey(),
m.getValue());
+// entry.setBackup(false);
+// entry.setProxy(true);
+// entry.setBackupNodes(m.getBackupNodes());
+// super.put(entry.getKey(), entry);
+// } catch (IOException x) {
+// log.error("Unable to deserialize MapMessage.",
x);
+// } catch (ClassNotFoundException x) {
+// log.error("Unable to deserialize MapMessage.",
x);
+// }
+ }//for
}
}
} catch (ChannelException x) {
@@ -312,7 +318,6 @@
//backup request
if (mapmsg.getMsgType() == mapmsg.MSG_RETRIEVE_BACKUP) {
- System.out.println("Received a retrieve request for
id:"+mapmsg.getKey());
MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
if (entry == null)return null;
mapmsg.setValue( (Serializable) entry.getValue());
@@ -352,18 +357,29 @@
if (! (msg instanceof MapMessage))return;
MapMessage mapmsg = (MapMessage) msg;
- if (mapmsg.getMsgType() == MapMessage.MSG_START) {
- mapMemberAdded(mapmsg.getBackupNodes()[0]);
+ try {
+ mapmsg.deserialize(getExternalLoaders());
+ if (mapmsg.getMsgType() == MapMessage.MSG_START) {
+ mapMemberAdded(mapmsg.getBackupNodes()[0]);
+ }
+ } catch (IOException x ) {
+ log.error("Unable to deserialize MapMessage.",x);
+ } catch (ClassNotFoundException x ) {
+ log.error("Unable to deserialize MapMessage.",x);
}
}
public void messageReceived(Serializable msg, Member sender) {
- //todo implement all the messages that we can receive
- //messages we can receive are MSG_PROXY, MSG_BACKUP
- if (! (msg instanceof MapMessage))return;
+ if (! (msg instanceof MapMessage)) return;
MapMessage mapmsg = (MapMessage) msg;
-
+ try {
+ mapmsg.deserialize(getExternalLoaders());
+ } catch (IOException x) {
+ log.error("Unable to deserialize MapMessage.", x);
+ } catch (ClassNotFoundException x) {
+ log.error("Unable to deserialize MapMessage.", x);
+ }
if (mapmsg.getMsgType() == MapMessage.MSG_START) {
mapMemberAdded(mapmsg.getBackupNodes()[0]);
}
@@ -385,7 +401,6 @@
}
if (mapmsg.getMsgType() == MapMessage.MSG_BACKUP) {
- System.out.println("Received a backup request for
id:"+mapmsg.getKey());
MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
if (entry == null) {
entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
@@ -424,7 +439,6 @@
} //end if
super.put(entry.getKey(), entry);
} //end if
- printMap();
}
public boolean accept(Serializable msg, Member sender) {
@@ -435,15 +449,11 @@
}
public void mapMemberAdded(Member member) {
- System.out.println("Received Member added:"+member.getName());
if ( member.equals(getChannel().getLocalMember(false)) ) return;
- System.out.println("Received Member added2:"+member.getName());
//select a backup node if we don't have one
synchronized (mapMembers) {
if (!mapMembers.contains(member) ) mapMembers.add(member);
}
- System.out.println("Received Member added3:"+member.getName());
- printMap();
synchronized (stateMutex) {
Iterator i = super.entrySet().iterator();
while (i.hasNext()) {
@@ -514,9 +524,10 @@
// METHODS TO OVERRIDE
//------------------------------------------------------------------------------
- protected void printMap() {
+ protected void printMap(String header) {
try {
- System.out.println("\nMap["+((Object)this).toString()+"; " + new
String(mapContextName, chset) + ", Map Size:" + super.size());
+ System.out.println("\nDEBUG MAP:"+header);
+ System.out.println("Map["+((Object)this).toString()+"; " + new
String(mapContextName, chset) + ", Map Size:" + super.size());
Member[] mbrs = getMapMembers();
for ( int i=0; i<mbrs.length;i++ ) {
System.out.println("Mbr["+(i+1)+"="+mbrs[i].getName());
@@ -535,17 +546,6 @@
}
//------------------------------------------------------------------------------
-// Map Owner - serialization/deserialization
-//------------------------------------------------------------------------------
- public static interface MapOwner {
-
- public byte[] serialize(Object mapObject) throws IOException;
-
- public Serializable deserialize(byte[] data) throws
ClassNotFoundException,IOException;
-
- }
-
-//------------------------------------------------------------------------------
// Map Entry class
//------------------------------------------------------------------------------
public static class MapEntry implements Map.Entry {
@@ -672,6 +672,8 @@
private boolean diff;
private Serializable key;
private Serializable value;
+ private byte[] valuedata;
+ private byte[] keydata;
private byte[] diffvalue;
private Member[] nodes;
@@ -688,6 +690,11 @@
this.diffvalue = diffvalue;
this.nodes = nodes;
}
+
+ public void deserialize(ClassLoader[] cls) throws IOException,
ClassNotFoundException {
+ key(cls);
+ value(cls);
+ }
public int getMsgType() {
return msgtype;
@@ -698,12 +705,44 @@
}
public Serializable getKey() {
- return key;
+ try {
+ return key(null);
+ } catch ( Exception x ) {
+ log.error("Deserialization error of the MapMessage.key",x);
+ return null;
+ }
}
+ public Serializable key(ClassLoader[] cls) throws IOException,
ClassNotFoundException {
+ if ( key!=null ) return key;
+ if ( keydata == null || keydata.length == 0 ) return null;
+ key = XByteBuffer.deserialize(keydata,0,keydata.length,cls);
+ return key;
+ }
+
+ public byte[] getKeyData() {
+ return keydata;
+ }
+
public Serializable getValue() {
+ try {
+ return value(null);
+ } catch ( Exception x ) {
+ log.error("Deserialization error of the MapMessage.value",x);
+ return null;
+ }
+ }
+
+ public Serializable value(ClassLoader[] cls) throws IOException,
ClassNotFoundException {
+ if ( value!=null ) return value;
+ if ( valuedata == null || valuedata.length == 0 ) return null;
+ value = XByteBuffer.deserialize(valuedata,0,valuedata.length,cls);
return value;
}
+
+ public byte[] getValueData() {
+ return valuedata;
+ }
public byte[] getDiffValue() {
return diffvalue;
@@ -735,6 +774,12 @@
}
return members;
}
+
+ protected byte[] readBytes(ObjectInput in) throws IOException {
+ byte[] data = new byte[in.readInt()];
+ in.read(data);
+ return data;
+ }
public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
mapId = new byte[in.readInt()];
@@ -744,27 +789,26 @@
case MSG_BACKUP:
case MSG_STATE: {
diff = in.readBoolean();
- key = (Serializable) in.readObject();
+ keydata = readBytes(in);
if (diff) {
- diffvalue = new byte[in.readInt()];
- in.read(diffvalue);
+ diffvalue = readBytes(in);
} else {
- value = (Serializable) in.readObject();
+ valuedata = readBytes(in);
} //endif
nodes = readMembers(in);
break;
}
case MSG_RETRIEVE_BACKUP: {
- key = (Serializable) in.readObject();
- value = (Serializable) in.readObject();
+ keydata = readBytes(in);
+ valuedata = readBytes(in);
break;
}
case MSG_REMOVE: {
- key = (Serializable) in.readObject();
+ keydata = readBytes(in);
break;
}
case MSG_PROXY: {
- key = (Serializable) in.readObject();
+ keydata = readBytes(in);
this.nodes = readMembers(in);
break;
}
@@ -772,8 +816,7 @@
case MSG_STOP: {
nodes = readMembers(in);
break;
- }
-
+ }
} //switch
} //readExternal
@@ -789,6 +832,16 @@
}
}
+ protected void writeBytes(ObjectOutput out, byte[] data) throws
IOException {
+ out.writeInt(data.length);
+ out.write(data);
+ }
+
+ protected void writeObject(ObjectOutput out, Serializable o) throws
IOException {
+ byte[] data = XByteBuffer.serialize(o);
+ writeBytes(out,data);
+ }
+
public void writeExternal(ObjectOutput out) throws IOException {
out.writeInt(mapId.length);
out.write(mapId);
@@ -797,27 +850,27 @@
case MSG_BACKUP:
case MSG_STATE: {
out.writeBoolean(diff);
- out.writeObject(key);
+ writeObject(out,key);
if (diff) {
out.writeInt(diffvalue.length);
out.write(diffvalue);
} else {
- out.writeObject(value);
+ writeObject(out,value);
} //endif
writeMembers(out,nodes);
break;
}
case MSG_RETRIEVE_BACKUP: {
- out.writeObject(key);
- out.writeObject(value);
+ writeObject(out,key);
+ writeObject(out,value);
break;
}
case MSG_REMOVE: {
- out.writeObject(key);
+ writeObject(out,key);
break;
}
case MSG_PROXY: {
- out.writeObject(key);
+ writeObject(out,key);
writeMembers(out,nodes);
break;
}
@@ -834,7 +887,10 @@
* @return Object
*/
public Object clone() {
- return new MapMessage(this.mapId, this.msgtype, this.diff,
this.key, this.value, this.diffvalue, this.nodes);
+ MapMessage msg = new MapMessage(this.mapId, this.msgtype,
this.diff, this.key, this.value, this.diffvalue, this.nodes);
+ msg.keydata = this.keydata;
+ msg.valuedata = this.valuedata;
+ return msg;
}
} //MapMessage
@@ -867,8 +923,16 @@
return mapOwner;
}
- public void setMapOwner(MapOwner mapOwner) {
+ public ClassLoader[] getExternalLoaders() {
+ return externalLoaders;
+ }
+
+ public void setMapOwner(Object mapOwner) {
this.mapOwner = mapOwner;
+ }
+
+ public void setExternalLoaders(ClassLoader[] externalLoaders) {
+ this.externalLoaders = externalLoaders;
}
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=387872&r1=387871&r2=387872&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
Wed Mar 22 07:14:44 2006
@@ -29,7 +29,6 @@
import org.apache.catalina.tribes.ChannelListener;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
-import org.apache.catalina.tribes.tipis.AbstractReplicatedMap.MapOwner;
/**
* A smart implementation of a stateful replicated map. uses primary/secondary
backup strategy.
@@ -85,7 +84,7 @@
* @param initialCapacity int - the size of this map, see HashMap
* @param loadFactor float - load factor, see HashMap
*/
- public LazyReplicatedMap(MapOwner owner, Channel channel, long
timeout, String mapContextName, int initialCapacity, float loadFactor) {
+ public LazyReplicatedMap(Object owner, Channel channel, long timeout,
String mapContextName, int initialCapacity, float loadFactor) {
super(owner,channel,timeout,mapContextName,initialCapacity,loadFactor,
Channel.SEND_OPTIONS_DEFAULT);
}
@@ -96,7 +95,7 @@
* @param mapContextName String - unique name for this map, to allow
multiple maps per channel
* @param initialCapacity int - the size of this map, see HashMap
*/
- public LazyReplicatedMap(MapOwner owner, Channel channel, long
timeout, String mapContextName, int initialCapacity) {
+ public LazyReplicatedMap(Object owner, Channel channel, long timeout,
String mapContextName, int initialCapacity) {
super(owner, channel,timeout,mapContextName,initialCapacity,
LazyReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT);
}
@@ -106,7 +105,7 @@
* @param timeout long - timeout for RPC messags
* @param mapContextName String - unique name for this map, to allow
multiple maps per channel
*/
- public LazyReplicatedMap(MapOwner owner, Channel channel, long
timeout, String mapContextName) {
+ public LazyReplicatedMap(Object owner, Channel channel, long timeout,
String mapContextName) {
super(owner, channel,timeout,mapContextName,
LazyReplicatedMap.DEFAULT_INITIAL_CAPACITY,LazyReplicatedMap.DEFAULT_LOAD_FACTOR,Channel.SEND_OPTIONS_DEFAULT);
}
@@ -143,19 +142,15 @@
}
public Object get(Object key) {
- System.out.println("Getting session id:"+key);
- printMap();
MapEntry entry = (MapEntry)super.get(key);
if ( entry == null ) return null;
if ( !entry.isPrimary() ) {
//if the message is not primary, we need to retrieve the latest
value
try {
-
Member[] backup = null;
MapMessage msg = null;
if ( !entry.isBackup() ) {
//make sure we don't retrieve from ourselves
- System.out.println("Retrieving from remote session
id:"+key);
msg = new MapMessage(getMapContextName(),
MapMessage.MSG_RETRIEVE_BACKUP, false,
(Serializable) key, null, null, null);
Response[] resp =
getRpcChannel().send(entry.getBackupNodes(),msg,
this.getRpcChannel().FIRST_REPLY, Channel.SEND_OPTIONS_DEFAULT,
getRpcTimeout());
@@ -228,7 +223,6 @@
log.error("Unable to replicate out data for a
LazyReplicatedMap.put operation", x);
}
super.put(key,entry);
- printMap();
return old;
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java?rev=387872&r1=387871&r2=387872&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java
Wed Mar 22 07:14:44 2006
@@ -29,7 +29,6 @@
import org.apache.catalina.tribes.ChannelListener;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
-import org.apache.catalina.tribes.tipis.AbstractReplicatedMap.MapOwner;
/**
* All-to-all replication for a hash map implementation. Each node in the
cluster will carry an identical
@@ -67,7 +66,7 @@
* @param initialCapacity int - the size of this map, see HashMap
* @param loadFactor float - load factor, see HashMap
*/
- public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String
mapContextName, int initialCapacity,float loadFactor) {
+ public ReplicatedMap(Object owner, Channel channel, long timeout, String
mapContextName, int initialCapacity,float loadFactor) {
super(owner,channel, timeout, mapContextName, initialCapacity,
loadFactor, Channel.SEND_OPTIONS_DEFAULT);
}
@@ -78,7 +77,7 @@
* @param mapContextName String - unique name for this map, to allow
multiple maps per channel
* @param initialCapacity int - the size of this map, see HashMap
*/
- public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String
mapContextName, int initialCapacity) {
+ public ReplicatedMap(Object owner, Channel channel, long timeout, String
mapContextName, int initialCapacity) {
super(owner,channel, timeout, mapContextName, initialCapacity,
AbstractReplicatedMap.DEFAULT_LOAD_FACTOR,Channel.SEND_OPTIONS_DEFAULT);
}
@@ -88,7 +87,7 @@
* @param timeout long - timeout for RPC messags
* @param mapContextName String - unique name for this map, to allow
multiple maps per channel
*/
- public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String
mapContextName) {
+ public ReplicatedMap(Object owner, Channel channel, long timeout, String
mapContextName) {
super(owner, channel, timeout,
mapContextName,AbstractReplicatedMap.DEFAULT_INITIAL_CAPACITY,
AbstractReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT);
}
Modified:
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java?rev=387872&r1=387871&r2=387872&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java
(original)
+++
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java
Wed Mar 22 07:14:44 2006
@@ -133,6 +133,19 @@
public Session createEmptySession() {
return new DeltaSession(this);
}
+
+ public ClassLoader[] getClassLoaders() {
+ Loader loader = null;
+ ClassLoader classLoader = null;
+ if (container != null) loader = container.getLoader();
+ if (loader != null) classLoader = loader.getClassLoader();
+ else classLoader = Thread.currentThread().getContextClassLoader();
+ if ( classLoader == Thread.currentThread().getContextClassLoader() ) {
+ return new ClassLoader[] {classLoader};
+ } else {
+ return new ClassLoader[]
{classLoader,Thread.currentThread().getContextClassLoader()};
+ }
+ }
/**
* Open Stream and use correct ClassLoader (Container) Switch
@@ -147,24 +160,8 @@
}
public ReplicationStream getReplicationStream(byte[] data, int offset, int
length) throws IOException {
- ByteArrayInputStream fis =null;
- ReplicationStream ois = null;
- Loader loader = null;
- ClassLoader classLoader = null;
- //fix to be able to run the DeltaManager
- //stand alone without a container.
- //use the Threads context class loader
- if (container != null) loader = container.getLoader();
- if (loader != null) classLoader = loader.getClassLoader();
- else classLoader = Thread.currentThread().getContextClassLoader();
- //end fix
- fis = new ByteArrayInputStream(data, offset, length);
- if ( classLoader == Thread.currentThread().getContextClassLoader() ) {
- ois = new ReplicationStream(fis, new ClassLoader[] {classLoader});
- } else {
- ois = new ReplicationStream(fis, new ClassLoader[]
{classLoader,Thread.currentThread().getContextClassLoader()});
- }
- return ois;
+ ByteArrayInputStream fis = new ByteArrayInputStream(data, offset,
length);
+ return new ReplicationStream(fis, getClassLoaders());
}
@@ -186,16 +183,15 @@
*/
public void start() throws LifecycleException {
if ( this.started ) return;
-
-
- //start the javagroups channel
try {
CatalinaCluster catclust = (CatalinaCluster)cluster;
catclust.addManager(getName(), this);
- this.sessions = new LazyReplicatedMap(this,
- catclust.getChannel(),
- DEFAULT_REPL_TIMEOUT,
- getMapName());
+ LazyReplicatedMap map = new LazyReplicatedMap(this,
+
catclust.getChannel(),
+ DEFAULT_REPL_TIMEOUT,
+ getMapName());
+ map.setExternalLoaders(getClassLoaders());
+ this.sessions = map;
super.start();
} catch ( Exception x ) {
log.error("Unable to start BackupManager",x);
Modified:
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/ClusterManagerBase.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/ClusterManagerBase.java?rev=387872&r1=387871&r2=387872&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/ClusterManagerBase.java
(original)
+++
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/ClusterManagerBase.java
Wed Mar 22 07:14:44 2006
@@ -32,6 +32,20 @@
*/
public abstract class ClusterManagerBase extends ManagerBase implements
Lifecycle, PropertyChangeListener, ClusterManager{
+
+ public ClassLoader[] getClassLoaders() {
+ Loader loader = null;
+ ClassLoader classLoader = null;
+ if (container != null) loader = container.getLoader();
+ if (loader != null) classLoader = loader.getClassLoader();
+ else classLoader = Thread.currentThread().getContextClassLoader();
+ if ( classLoader == Thread.currentThread().getContextClassLoader() ) {
+ return new ClassLoader[] {classLoader};
+ } else {
+ return new ClassLoader[]
{classLoader,Thread.currentThread().getContextClassLoader()};
+ }
+ }
+
/**
* Open Stream and use correct ClassLoader (Container) Switch
* ThreadClassLoader
@@ -45,27 +59,9 @@
}
public ReplicationStream getReplicationStream(byte[] data, int offset, int
length) throws IOException {
- ByteArrayInputStream fis =null;
- ReplicationStream ois = null;
- Loader loader = null;
- ClassLoader classLoader = null;
- //fix to be able to run the DeltaManager
- //stand alone without a container.
- //use the Threads context class loader
- if (container != null)
- loader = container.getLoader();
- if (loader != null)
- classLoader = loader.getClassLoader();
- else
- classLoader = Thread.currentThread().getContextClassLoader();
- //end fix
- fis = new ByteArrayInputStream(data,offset,length);
- if ( classLoader == Thread.currentThread().getContextClassLoader() ) {
- ois = new ReplicationStream(fis, new ClassLoader[] {classLoader});
- } else {
- ois = new ReplicationStream(fis, new ClassLoader[]
{classLoader,Thread.currentThread().getContextClassLoader()});
- }
- return ois;
+ ByteArrayInputStream fis = new ByteArrayInputStream(data, offset,
length);
+ return new ReplicationStream(fis, getClassLoaders());
}
+
}
Modified:
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaSession.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaSession.java?rev=387872&r1=387871&r2=387872&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaSession.java
(original)
+++
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaSession.java
Wed Mar 22 07:14:44 2006
@@ -336,7 +336,6 @@
public void setOwner(Object owner) {
if ( owner instanceof ClusterManager && getManager()==null) {
- System.out.println("Setting owner for
session:"+getIdInternal()+" to:"+owner);
ClusterManager cm = (ClusterManager)owner;
this.setManager(cm);
this.setValid(true);
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]