User: patriot1burke
Date: 01/09/13 20:53:29
Added: src/main/org/jboss/ha RoundRobin.java LoadBalancePolicy.java
HARMITarget.java HAPartitionImpl.java
HAPartition.java DistributedStateImpl.java
DistributedState.java
DistributedReplicantManagerImpl.java
DistributedReplicantManager.java
ClusterPartitionMBean.java ClusterPartition.java
Log:
clustering. Work in Progress. This code probably doesn't work, but it
does compile
Revision Changes Path
1.1 jbossmx/src/main/org/jboss/ha/RoundRobin.java
Index: RoundRobin.java
===================================================================
package org.jboss.ha;
public class RoundRobin implements LoadBalancePolicy
{
protected transient int cursorRemote = 0;
public Object chooseTarget(Object[] targets)
{
cursorRemote = ( (cursorRemote + 1) % targets.length );
return targets[cursorRemote];
}
}
1.1 jbossmx/src/main/org/jboss/ha/LoadBalancePolicy.java
Index: LoadBalancePolicy.java
===================================================================
package org.jboss.ha;
public interface LoadBalancePolicy extends java.io.Serializable
{
public Object chooseTarget(Object[] targets);
}
1.1 jbossmx/src/main/org/jboss/ha/HARMITarget.java
Index: HARMITarget.java
===================================================================
/*
* JBoss, the OpenSource J2EE WebOS
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.ha;
import java.util.Vector;
import java.util.ArrayList;
import java.rmi.ConnectException;
import java.rmi.ConnectIOException;
import java.rmi.NoSuchObjectException;
import java.rmi.UnknownHostException;
import java.rmi.RemoteException;
import java.lang.reflect.Method;
import java.lang.reflect.InvocationTargetException;
/**
*
*
* @author <a href="mailto:[EMAIL PROTECTED]">Sacha Labourey</a>
* @author <a href="mailto:[EMAIL PROTECTED]">Bill Burke</a>
* @version $Revision: 1.1 $
*
* <p><b>Revisions:</b>
*
* <p><b>20010831 Bill Burke:</b>
* <ul>
* <li> First import of sources
* </ul>
*/
public class HARMITarget implements java.io.Serializable,
java.lang.reflect.InvocationHandler
{
protected volatile long DELAY_FOR_RESYNCH_AFTER_DEAD_TARGET = 5000; // 5 seconds
protected volatile long DELAY_FOR_RESYNCH_FOR_NORMAL_UPDATE = 30000; // 30 seconds
protected transient volatile long nextResynch;
protected transient volatile boolean updatingData = false;
protected ArrayList managers = new ArrayList(0);
protected Object[] targets = new Object[0];
protected String targetName;
protected LoadBalancePolicy loadBalancePolicy;
public HARMITarget(String targetName,
long normalResynch,
long deathResynch,
ArrayList managers,
Object[] targets,
LoadBalancePolicy policy)
{
this.targetName = targetName;
DELAY_FOR_RESYNCH_AFTER_DEAD_TARGET = deathResynch;
DELAY_FOR_RESYNCH_FOR_NORMAL_UPDATE = normalResynch;
nextResynch = System.currentTimeMillis () +
DELAY_FOR_RESYNCH_FOR_NORMAL_UPDATE;
updatingData = false;
this.managers = managers;
this.targets = targets;
this.loadBalancePolicy = policy;
}
public ArrayList getManagers()
{
return managers;
}
public void setManagers(ArrayList newManagers)
{
synchronized(managers)
{
managers = newManagers;
}
}
public Object[] getTargets()
{
return targets;
}
public void setTargets(Object[] newTargets)
{
synchronized(targets)
{
targets = newTargets;
}
}
public Object getRemoteTarget()
{
if (targets.length == 0)
{
findTargetsSynchronously ();
if (targets.length == 0)
return null;
}
synchronized (targets)
{
tick();
return loadBalancePolicy.chooseTarget(targets);
}
}
public void remoteTargetHasFailed(Object target)
{
removeDeadTarget(target);
}
protected void removeDeadTarget (Object target)
{
//System.out.println("Size before : " + Integer.toString(targets.length));
if (targets != null)
{
synchronized (targets)
{
//System.out.println("removeDeadTarget has been called");
int length = targets.length;
for (int i=0; i<length; ++i)
{
if (targets[i] == target)
{
Object[] copy = new Object[length - 1];
System.arraycopy(targets, 0, copy, 0, i);
if ( (i+1) < length)
{
System.arraycopy(targets, i+1, copy, i, length - i - 1);
}
targets = copy;
resynchAtLeastAt(System.currentTimeMillis () +
DELAY_FOR_RESYNCH_AFTER_DEAD_TARGET);
tick();
return;
}
}
}
}
// nothing found
}
protected void tick()
{
if (!updatingData) // this is done in order to do not have to wait on
asynchronous updates
{
boolean resynch = false;
synchronized (this)
{
long now = System.currentTimeMillis ();
resynch = (now >= nextResynch);
if (resynch)
{
//System.out.println("Tick has decided to resynch. Next auto resynh
at " + Long.toString(now + DELAY_FOR_RESYNCH_FOR_NORMAL_UPDATE ) + " (now is " +
Long.toString (now) + " and old update value was: " + Long.toString(nextResynch) +
")");
nextResynch = ( now + DELAY_FOR_RESYNCH_FOR_NORMAL_UPDATE ) ;
}
}
if (resynch)
findTargetsAsynchronously ();
}
}
protected void resynchAtLeastAt(long when)
{
// only keep more urgent resynch need
//
if ( (nextResynch == 0) || (when < nextResynch) )
nextResynch = when;
}
protected void updateTargets()
{
//System.out.println("updateTargets has been called");
for (int i=0; i < this.managers.size(); i++)
{
try
{
DistributedReplicantManager supplier =
(DistributedReplicantManager)managers.get(i);
ArrayList replicants = supplier.lookupReplicants(targetName);
ArrayList newManagers =
supplier.lookupReplicants("DistributedReplicantManager");
resynchAtLeastAt(System.currentTimeMillis () +
DELAY_FOR_RESYNCH_FOR_NORMAL_UPDATE);
synchronized (targets)
{
this.targets = replicants.toArray();
}
synchronized(managers)
{
this.managers = newManagers;
}
return;
}
catch (Exception e)
{/*e.printStackTrace ();*/} // during debug phase only!!!
}
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
{
Object target = getRemoteTarget();
while (target != null)
{
try
{
// Is this step actually necessary? Can I just do method.invoke(target,
args); ?
Method m = target.getClass().getMethod(method.getName(),
method.getParameterTypes());
return m.invoke(target, args);
}
catch (IllegalAccessException iae)
{
throw iae;
}
catch (IllegalArgumentException iae)
{
throw iae;
}
catch (InvocationTargetException ite)
{
Throwable targetException = ite.getTargetException();
if (targetException instanceof ConnectException
|| targetException instanceof ConnectIOException
|| targetException instanceof NoSuchObjectException
|| targetException instanceof UnknownHostException)
{
// ignore
}
else
{
throw targetException;
}
}
// If we reach here, this means that we must fail-over
remoteTargetHasFailed(target);
target = getRemoteTarget();
}
// if we get here this means list was exhausted
throw new RemoteException("Service unavailable.");
}
protected void findTargetsSynchronously()
{
// if this is called, it is because our target list is empty
//
//System.out.println("findTargetsSynchronously has been called");
synchronized (this)
{
if (targets.length > 0)
return; // the list has been probably refilled during our wait for the
monitor
updateTargets ();
}
}
protected void findTargetsAsynchronously()
{
//System.out.println("findTargetsAsynchronously has been called");
updatingData = true;
Thread t = new Thread ( new Runnable ()
{
public void run ()
{
try
{
synchronized(this)
{
updateTargets ();
}
} finally
{
updatingData = false;
}
}
});
t.start();
}
}
1.1 jbossmx/src/main/org/jboss/ha/HAPartitionImpl.java
Index: HAPartitionImpl.java
===================================================================
/*
* JBoss, the OpenSource J2EE WebOS
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.ha;
import JavaGroups.MethodLookup;
import JavaGroups.MethodLookupClos;
import JavaGroups.MembershipListener;
import JavaGroups.MessageListener;
import JavaGroups.JChannel;
import JavaGroups.Channel;
import JavaGroups.Message;
import JavaGroups.MethodCall;
import JavaGroups.RpcDispatcher;
import JavaGroups.RspList;
import JavaGroups.GroupRequest;
import JavaGroups.View;
import JavaGroups.Address;
import JavaGroups.Util;
import java.util.Vector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.io.Serializable;
import javax.naming.InitialContext;
public class HAPartitionImpl extends RpcDispatcher implements HAPartition,
MembershipListener, MessageListener
{
protected HashMap rpcHandlers = new HashMap();
protected HashMap stateHandlers = new HashMap();
protected ArrayList listeners = new ArrayList();
protected String partitionName;
protected Vector members = null;
protected int timeout = 500;
protected JChannel channel = null;
protected DistributedReplicantManagerImpl replicantManager;
protected String nodeName;
final MethodLookup method_lookup_clos=new MethodLookupClos();
public HAPartitionImpl(String partitionName, JChannel channel,
boolean deadlock_detection) throws Exception
{
super(channel, null, null, null, deadlock_detection);
this.channel = channel;
this.partitionName = partitionName;
this.nodeName = channel.GetLocalAddress().toString();
}
public void init() throws Exception
{
SetMembershipListener(this);
SetMessageListener(this);
this.replicantManager = new DistributedReplicantManagerImpl(this);
this.replicantManager.init();
new InitialContext().bind("/HAPartition/" + partitionName, this);
}
public void close() throws Exception
{
channel.Close();
new InitialContext().unbind("/HAPartition/" + partitionName);
}
public String getNodeName()
{
return nodeName;
}
public String getPartitionName()
{
return partitionName;
}
public DistributedReplicantManager getDistributedReplicantManager()
{
return replicantManager;
}
// ***************************
// ***************************
// RPC multicast communication
// ***************************
// ***************************
//
public void registerRPCHandler(String objName, Object subscriber)
{
rpcHandlers.put(objName, subscriber);
}
public void unregisterRPCHandler(String objName, Object subscriber)
{
rpcHandlers.remove(objName);
}
/**
* This function is an abstraction of RpcDispatcher.
*/
public ArrayList callMethodOnCluster(String objName, String methodName, Object[]
args, boolean excludeSelf) throws Exception
{
MethodCall m = new MethodCall(objName + "." + methodName, args);
RspList rsp = this.CallRemoteMethods(null, m, GroupRequest.GET_ALL, timeout);
ArrayList rtn = new ArrayList(rsp.size());
for (int i = 0; i < rsp.size(); i++)
rtn.add(rsp.elementAt(i));
if (!excludeSelf)
{
Object handler = rpcHandlers.get(objName);
Object retval=m.Invoke(handler, method_lookup_clos);
rtn.add(retval);
}
return rtn;
}
// *************************
// *************************
// State transfer management
// *************************
// *************************
//
public void subscribeToStateTransferEvents(String objectName,
HAPartition.HAPartitionStateTransfer subscriber)
{
stateHandlers.put(objectName, subscriber);
// We must now syncrhonize new state transfer subscriber
boolean rc = channel.GetState(null, 8000);
/*
if (rc) System.out.println("State was retrieved successfully");
else System.out.println("State could not be retrieved, (must be first
member of group)");
*/
}
public void unsubscribeFromStateTransferEvents(String objectName,
HAPartition.HAPartitionStateTransfer subscriber)
{
stateHandlers.remove(objectName);
}
// MessageLister methods
public Object GetState()
{
HashMap state = new HashMap();
Iterator keys = stateHandlers.keySet().iterator();
while (keys.hasNext())
{
String key = (String)keys.next();
HAPartition.HAPartitionStateTransfer subscriber =
(HAPartition.HAPartitionStateTransfer)stateHandlers.get(key);
state.put(key, subscriber.getCurrentState());
}
return state;
}
public void SetState(Object obj)
{
HashMap state = (HashMap)obj;
Iterator keys = state.keySet().iterator();
while (keys.hasNext())
{
String key = (String)keys.next();
Object someState = state.get(key);
HAPartition.HAPartitionStateTransfer subscriber =
(HAPartition.HAPartitionStateTransfer)stateHandlers.get(key);
subscriber.setCurrentState((Serializable)someState);
}
}
public void Receive(Message msg) { /* complete */}
// *************************
// *************************
// Group Membership listeners
// *************************
// *************************
//
public void registerMembershipListener(HAPartition.HAMembershipListener listener)
{
synchronized(this.listeners)
{
this.listeners.add(listener);
}
}
public void unregisterMembershipListener(HAPartition.HAMembershipListener
listener)
{
synchronized(this.listeners)
{
int idx = listeners.indexOf(listener);
if (idx != -1)
listeners.remove(idx);
}
}
// MembershipListener required methods
public void Suspect(Address suspected_mbr)
{
// complete
}
public void Block() {}
public void ViewAccepted(View newView)
{
Vector oldMembers = this.members;
Vector allMembers = newView.GetMembers();
Vector deadMembers = getDeadMembers(oldMembers, allMembers);
Vector newMembers = getNewMembers(oldMembers, allMembers);
this.members = allMembers;
synchronized(this.listeners)
{
for (int i = 0; i < listeners.size(); i++)
{
((HAPartition.HAMembershipListener)listeners.get(i)).membershipChanged(deadMembers,
newMembers, allMembers);
}
}
}
protected Vector getDeadMembers (Vector oldMembers, Vector newMembers)
{
Vector dead = new Vector ();
for (int i=0; i<oldMembers.size ();i++)
if (!newMembers.contains (oldMembers.elementAt(i)))
dead.add (oldMembers.elementAt (i));
return dead;
}
protected Vector getNewMembers (Vector oldMembers, Vector allMembers)
{
Vector newMembers = new Vector ();
for (int i=0; i<allMembers.size ();i++)
if (!oldMembers.contains(allMembers.elementAt(i)))
newMembers.add (allMembers.elementAt(i));
return newMembers;
}
///////////////////////////////////////////////////////////////
/**
Message contains MethodCall. Execute it against *this* object and return
result.
Use MethodCall.Invoke() to do this. Return result.
This overrides RpcDispatcher.Handle so that we can dispatch to many different
objects.
*/
public Object Handle(Message req)
{
Object body=null, retval=null;
MethodCall method_call;
if(server_obj == null) {
System.err.println("RpcDispatcher.Handle(): no method handler is registered
! " +
"Discarding request.");
return null;
}
if(req == null || req.GetBuffer() == null) {
System.err.println("RpcProtocol.Handle(): message or message buffer is null
!");
return null;
}
try {
body=Util.ObjectFromByteBuffer(req.GetBuffer());
}
catch(Exception e) {
System.err.println("RpcProtocol.Handle(): " + e);
return null;
}
if(body == null || !(body instanceof MethodCall)) {
System.err.println("RpcProtocol.Handle(): message does not contain a
MethodCall object !");
return null;
}
method_call=(MethodCall)body;
String methodName = method_call.GetName();
int idx = methodName.indexOf('.');
String handlerName = methodName.substring(0, idx);
String newMethodName = methodName.substring(idx + 1);
method_call.SetName(newMethodName);
Object handler = rpcHandlers.get(handlerName);
retval=method_call.Invoke(handler, method_lookup_clos);
return retval;
}
}
1.1 jbossmx/src/main/org/jboss/ha/HAPartition.java
Index: HAPartition.java
===================================================================
/*
* JBoss, the OpenSource J2EE WebOS
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.ha;
import java.util.ArrayList;
import java.util.Vector;
import java.io.Serializable;
public interface HAPartition
{
// General methods
//
public String getNodeName ();
public String getPartitionName();
public DistributedReplicantManager getDistributedReplicantManager();
// ***************************
// ***************************
// RPC multicast communication
// ***************************
// ***************************
//
public void registerRPCHandler(String objectName, Object handler);
public void unregisterRPCHandler(String objectName, Object subscriber);
// Called only on all members of this partition on all nodes
// (not subpartitions or other partitions)
//
public ArrayList callMethodOnCluster(String objectName, String methodName,
Object[] args, boolean excludeSelf) throws Exception;
// *************************
// *************************
// State transfer management
// *************************
// *************************
//
public interface HAPartitionStateTransfer
{
public Serializable getCurrentState ();
public void setCurrentState(Serializable newState);
}
public void subscribeToStateTransferEvents (String objectName,
HAPartition.HAPartitionStateTransfer subscriber);
public void unsubscribeFromStateTransferEvents (String objectName,
HAPartition.HAPartitionStateTransfer subscriber);
// *************************
// *************************
// Group Membership listeners
// *************************
// *************************
//
public interface HAMembershipListener
{
public void membershipChanged(Vector deadMembers, Vector newMembers, Vector
allMembers);
}
public void registerMembershipListener(HAMembershipListener listener);
public void unregisterMembershipListener(HAMembershipListener listener);
}
1.1 jbossmx/src/main/org/jboss/ha/DistributedStateImpl.java
Index: DistributedStateImpl.java
===================================================================
package org.jboss.ha;
import java.util.Vector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Collection;
import java.rmi.server.RemoteServer;
import java.rmi.server.UnicastRemoteObject;
import java.io.Serializable;
public class DistributedStateImpl extends RemoteServer
implements DistributedState,
HAPartition.HAPartitionStateTransfer,
HAPartition.HAMembershipListener
{
protected HashMap categories = new HashMap();
protected HashMap keyListeners = new HashMap();
protected HAPartition partition;
protected final static String SERVICE_NAME = "DistributedState";
public DistributedStateImpl(HAPartition partition)
{
this.partition = partition;
}
public void init() throws Exception
{
// When we subscribe to state transfer events, GetState will be called to
initialize
// this service.
partition.subscribeToStateTransferEvents(SERVICE_NAME, this);
partition.registerMembershipListener(this);
partition.registerRPCHandler(SERVICE_NAME, this);
// Export myself as an RMI server
UnicastRemoteObject.exportObject(this, 0);
// should bind myself as a replicant service? TODO
}
/////////////////////
// State transfer API
/////////////////////
public Serializable getCurrentState()
{
return categories;
}
public void setCurrentState(Serializable newState)
{
synchronized (this.categories)
{
categories.clear();
categories.putAll((HashMap)newState);
if (keyListeners.size() > 0)
{
cleanupKeyListeners();
}
}
}
////////////////////
// Group membership API
////////////////////
public void membershipChanged(Vector deadMembers, Vector newMembers, Vector
allMembers)
{
// If membership changes, this has no effect on the distributed state
//
}
///////////////
// DistributedReplicantManager API
///////////////
public void _set(String category, String key, Serializable value) throws Exception
{
synchronized(this.categories)
{
HashMap cat = (HashMap)categories.get(category);
if (cat == null)
{
cat = new HashMap();
categories.put (category, cat);
}
cat.put(key, value);
notifyKeyListeners(category, key, value);
}
}
public void set(String category, String key, Serializable value) throws Exception
{
Object[] args = {category, key, value};
partition.callMethodOnCluster(SERVICE_NAME, "_set", args, false);
}
public void _remove(String category, String key) throws Exception
{
synchronized(this.categories)
{
HashMap cat = (HashMap)categories.get(category);
if (cat == null) return;
Object removed = cat.remove(key);
if (removed != null)
{
notifyKeyListenersOfRemove(category, key, (Serializable)removed);
if (cat.size() == 0)
{
categories.remove(category);
}
}
}
}
public void remove(String category, String key) throws Exception
{
Object[] args = {category, key};
partition.callMethodOnCluster(SERVICE_NAME, "_remove", args, false);
}
public Serializable get(String category, String key)
{
synchronized(this.categories)
{
HashMap cat = (HashMap)categories.get(category);
if (cat == null) return null;
return (Serializable)cat.get(key);
}
}
public Collection getAllCategories()
{
synchronized(this.categories)
{
return categories.keySet();
}
}
public Collection getAllKeys(String category)
{
synchronized(this.categories)
{
HashMap cat = (HashMap)categories.get(category);
if (cat == null) return null;
return cat.keySet();
}
}
public Collection getAllValues(String category)
{
synchronized(this.categories)
{
HashMap cat = (HashMap)categories.get(category);
if (cat == null) return null;
return cat.values();
}
}
public void registerDSListener(String category, DistributedState.DSListener
subscriber)
{
synchronized(this.keyListeners)
{
ArrayList listeners = (ArrayList)keyListeners.get(category);
if (listeners == null)
{
listeners = new ArrayList();
keyListeners.put (category, listeners);
}
listeners.add(subscriber);
}
}
public void unregisterDSListener(String category, DistributedState.DSListener
subscriber)
{
synchronized(this.keyListeners)
{
ArrayList listeners = (ArrayList)keyListeners.get(category);
if (listeners == null) return;
int idx = listeners.indexOf(subscriber);
if (idx != -1)
{
listeners.remove(idx);
if (listeners.size() == 0)
{
keyListeners.remove(category);
}
}
}
}
protected void notifyKeyListeners(String category, String key, Serializable value)
{
synchronized(this.keyListeners)
{
ArrayList listeners = (ArrayList)keyListeners.get(category);
if (listeners == null) return;
for (int i = 0; i < listeners.size(); i++)
{
DistributedState.DSListener listener =
(DistributedState.DSListener)listeners.get(i);
listener.valueHasChanged(category, key, value);
}
}
}
protected void notifyKeyListenersOfRemove(String category, String key,
Serializable oldContent)
{
synchronized(this.keyListeners)
{
ArrayList listeners = (ArrayList)keyListeners.get(category);
if (listeners == null) return;
for (int i = 0; i < listeners.size(); i++)
{
DistributedState.DSListener listener =
(DistributedState.DSListener)listeners.get(i);
listener.keyHasBeenRemoved (category, key, oldContent);
}
}
}
protected void cleanupKeyListeners()
{
// NOT IMPLEMENTED YET
}
}
1.1 jbossmx/src/main/org/jboss/ha/DistributedState.java
Index: DistributedState.java
===================================================================
package org.jboss.ha;
import java.util.Collection;
import java.io.Serializable;
import java.rmi.Remote;
public interface DistributedState extends Remote
{
/**
* When a particular key in the DistributedState table gets modified, all
listeners
* will be notified of replicant changes for that key. Keys are organized in
categories.
*/
public interface DSListener
{
public void valueHasChanged(String category, String key, Serializable value);
public void keyHasBeenRemoved (String category, String key, Serializable
previousContent);
}
public void registerDSListener(String category, DSListener subscriber);
public void unregisterDSListener(String category, DSListener subscriber);
// State binding methods
//
/**
* remove the entire key from the ReplicationService
*/
public void remove(String category, String key) throws Exception;
/**
* Associates a value to a key in a specific category
*/
public void set(String category, String key, Serializable value) throws Exception;
/**
* Lookup the replicant attached to this cluster node
*/
public Serializable get(String category, String key);
/**
* Return a list of all categories.
*/
public Collection getAllCategories();
/**
* Return a list of all keys in a category.
*/
public Collection getAllKeys(String category);
/**
* Return a list of all values in a category.
*/
public Collection getAllValues(String category);
//
}
1.1
jbossmx/src/main/org/jboss/ha/DistributedReplicantManagerImpl.java
Index: DistributedReplicantManagerImpl.java
===================================================================
package org.jboss.ha;
import java.util.Vector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Collection;
import java.rmi.server.RemoteServer;
import java.rmi.server.UnicastRemoteObject;
import java.io.Serializable;
public class DistributedReplicantManagerImpl extends RemoteServer
implements DistributedReplicantManager,
HAPartition.HAPartitionStateTransfer,
HAPartition.HAMembershipListener
{
protected HashMap replicants = new HashMap();
protected HashMap keyListeners = new HashMap();
protected HAPartition partition;
protected final static String SERVICE_NAME = "DistributedReplicantManager";
public DistributedReplicantManagerImpl(HAPartition partition)
{
this.partition = partition;
}
public void init() throws Exception
{
// When we subscribe to state transfer events, GetState will be called to
initialize
// this service.
partition.subscribeToStateTransferEvents(SERVICE_NAME, this);
partition.registerMembershipListener(this);
partition.registerRPCHandler(SERVICE_NAME, this);
// Export myself as an RMI server
UnicastRemoteObject.exportObject(this, 0);
// Register myself as a replicant of DistributedReplicantManager
add(SERVICE_NAME, this);
}
/////////////////////
// State transfer API
/////////////////////
public Serializable getCurrentState()
{
return replicants;
}
public void setCurrentState(Serializable newState)
{
synchronized (this.replicants)
{
replicants.clear();
replicants.putAll((HashMap)newState);
if (keyListeners.size() > 0)
{
cleanupKeyListeners();
}
}
}
////////////////////
// Group membership API
////////////////////
public void membershipChanged(Vector deadMembers, Vector newMembers, Vector
allMembers)
{
// Here we only care about deadMembers. Purge all replicant lists of
deadMembers
// and then notify all listening nodes.
synchronized (this.replicants)
{
Iterator keys = replicants.keySet().iterator();
while (keys.hasNext())
{
String key = (String)keys.next();
HashMap replicant = (HashMap)replicants.get(key);
boolean modified = false;
for (int i = 0; i < deadMembers.size(); i++)
{
String node = deadMembers.elementAt(i).toString();
Object removed = replicant.remove(node);
if (removed != null) modified = true;
}
if (modified)
{
notifyKeyListeners(key, new ArrayList(replicant.values()));
}
}
}
}
///////////////
// DistributedReplicantManager API
///////////////
public void _add(String key, String nodeName, Serializable replicant)
{
synchronized(this.replicants)
{
HashMap rep = (HashMap)replicants.get(key);
if (rep == null)
{
rep = new HashMap();
replicants.put (key, rep);
}
rep.put(nodeName, replicant);
notifyKeyListeners(key, new ArrayList(rep.values()));
}
}
public void add(String key, Serializable replicant) throws Exception
{
Object[] args = {key, partition.getNodeName(), replicant};
partition.callMethodOnCluster(SERVICE_NAME, "_add", args, false);
}
public void _remove(String key, String nodeName)
{
synchronized(this.replicants)
{
HashMap replicant = (HashMap)replicants.get(key);
if (replicant == null) return;
Object removed = replicant.remove(nodeName);
if (removed != null)
{
Collection values = replicant.values();
notifyKeyListeners(key, new ArrayList(values));
if (values.size() == 0)
{
replicants.remove(key);
}
}
}
}
public void remove(String key) throws Exception
{
Object[] args = {key, partition.getNodeName()};
partition.callMethodOnCluster(SERVICE_NAME, "_remove", args, false);
}
public Serializable lookupLocalReplicant(String key)
{
synchronized(this.replicants)
{
HashMap replicant = (HashMap)replicants.get(key);
if (replicant == null) return null;
return (Serializable)replicant.get(partition.getNodeName());
}
}
public ArrayList lookupReplicants(String key)
{
synchronized(this.replicants)
{
HashMap replicant = (HashMap)replicants.get(key);
if (replicant == null) return null;
return new ArrayList(replicant.values());
}
}
public void registerListener(String key,
DistributedReplicantManager.ReplicantListener subscriber)
{
synchronized(this.keyListeners)
{
ArrayList listeners = (ArrayList)keyListeners.get(key);
if (listeners == null)
{
listeners = new ArrayList();
keyListeners.put (key, listeners);
}
listeners.add(subscriber);
}
}
public void unregisterListener(String key,
DistributedReplicantManager.ReplicantListener subscriber)
{
synchronized(this.keyListeners)
{
ArrayList listeners = (ArrayList)keyListeners.get(key);
if (listeners == null) return;
int idx = listeners.indexOf(subscriber);
if (idx != -1)
{
listeners.remove(idx);
if (listeners.size() == 0)
{
keyListeners.remove(key);
}
}
}
}
protected void notifyKeyListeners(String key, ArrayList newReplicants)
{
synchronized(this.keyListeners)
{
ArrayList listeners = (ArrayList)keyListeners.get(key);
if (listeners == null) return;
for (int i = 0; i < listeners.size(); i++)
{
DistributedReplicantManager.ReplicantListener listener =
(DistributedReplicantManager.ReplicantListener)listeners.get(i);
listener.replicantsChanged(key, newReplicants);
}
}
}
protected void cleanupKeyListeners()
{
// NOT IMPLEMENTED YET
}
}
1.1 jbossmx/src/main/org/jboss/ha/DistributedReplicantManager.java
Index: DistributedReplicantManager.java
===================================================================
package org.jboss.ha;
import java.util.ArrayList;
import java.io.Serializable;
import java.rmi.Remote;
public interface DistributedReplicantManager extends Remote
{
/**
* When a particular key in the DistributedReplicantManager table gets modified,
all listeners
* will be notified of replicant changes for that key.
*/
public interface ReplicantListener
{
public void replicantsChanged(String key, ArrayList newReplicants);
}
public void registerListener(String key, ReplicantListener subscriber);
public void unregisterListener(String key, ReplicantListener subscriber);
// State binding methods
//
/**
* remove the entire key from the ReplicationService
*/
public void remove(String key) throws Exception;
/**
* Add a replicant, it will be attached to this cluster node
*/
public void add(String key, Serializable replicant) throws Exception;
/**
* Lookup the replicant attached to this cluster node
*/
public Serializable lookupLocalReplicant(String key);
/**
* Return a list of all replicants.
*/
public ArrayList lookupReplicants(String key);
}
1.1 jbossmx/src/main/org/jboss/ha/ClusterPartitionMBean.java
Index: ClusterPartitionMBean.java
===================================================================
package org.jboss.ha;
public interface ClusterPartitionMBean
extends org.jboss.system.ServiceMBean
{
// Constants -----------------------------------------------------
public static final String OBJECT_NAME = ":service=HANaming";
public String getPartitionName();
public void setPartitionName(String newName);
public String getPartitionProperties();
public void setPartitionProperties(String newProps);
public boolean getDeadlockDetection();
public void setDeadlockDetection(boolean doit);
}
1.1 jbossmx/src/main/org/jboss/ha/ClusterPartition.java
Index: ClusterPartition.java
===================================================================
package org.jboss.ha;
import JavaGroups.JChannel;
import JavaGroups.Channel;
import org.jboss.logging.Logger;
import org.jboss.system.ServiceMBeanSupport;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.MalformedObjectNameException;
import org.apache.log4j.Category;
public class ClusterPartition extends ServiceMBeanSupport implements
ClusterPartitionMBean
{
protected String partitionName = "DefaultPartition";
protected String jgProps =
"UDP(mcast_addr=224.100.100.200;mcast_port=4567;ip_ttl=31;trace=true):" +
"PING(timeout=3000;num_initial_members=10):" +
"FD(trace=true;timeout=5000):" +
"VERIFY_SUSPECT(trace=false;timeout=1500):" +
"pbcast.NAKACK(trace=true):" +
"UNICAST(timeout=5000;min_wait_time=2000):" +
"FRAG:" +
"pbcast.GMS:" +
"pbcast.STATE_TRANSFER(trace=true)";
protected HAPartitionImpl partition;
protected boolean deadlock_detection = false;
public String getName()
{
return partitionName;
}
public String getPartitionName()
{
return partitionName;
}
public void setPartitionName(String newName)
{
partitionName = newName;
}
public String getPartitionProperties()
{
return jgProps;
}
public void setPartitionProperties(String newProps)
{
jgProps = newProps;
}
public boolean getDeadlockDetection()
{
return deadlock_detection;
}
public void setDeadlockDetection(boolean doit)
{
deadlock_detection = doit;
}
public ObjectName getObjectName(MBeanServer server, ObjectName name)
throws javax.management.MalformedObjectNameException
{
return new ObjectName(OBJECT_NAME);
}
public void startService()
throws Exception
{
log.info("Starting ClusterPartition: " + partitionName);
JChannel channel = new JChannel(jgProps);
channel.SetOpt(Channel.LOCAL, new Boolean(false));
channel.SetOpt(Channel.GET_STATE_EVENTS, new Boolean(true));
channel.Connect(partitionName);
partition = new HAPartitionImpl(partitionName, channel, deadlock_detection);
partition.init();
}
public void stopService()
{
try
{
log.info("Stopping ClusterPartition: " + partitionName);
partition.close();
}
catch (Exception e)
{
log.error("Exception during shutdown", e);
}
}
}
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development