User: slaboure
Date: 01/09/30 08:20:59
Added: src/main/org/jboss/ha/framework/server ClusterPartition.java
ClusterPartitionMBean.java
DistributedReplicantManagerImpl.java
DistributedStateImpl.java HAPartitionImpl.java
HARMIServerImpl.java
Log:
moved client side files of the HA framework
from org.jboss.ha to org.jboss.ha.framework.interfaces.
small code modifs.
suppressed CR/LF issues.
Revision Changes Path
1.1
jbossmx/src/main/org/jboss/ha/framework/server/ClusterPartition.java
Index: ClusterPartition.java
===================================================================
package org.jboss.ha.framework.server;
import JavaGroups.JChannel;
import JavaGroups.Channel;
import org.jboss.logging.Logger;
import org.jboss.system.ServiceMBeanSupport;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.MalformedObjectNameException;
import org.apache.log4j.Category;
public class ClusterPartition extends ServiceMBeanSupport implements
ClusterPartitionMBean
{
protected String partitionName = "DefaultPartition";
protected String jgProps =
"UDP:" +
"PING:" +
"FD(trace=true;timeout=5000):" +
"VERIFY_SUSPECT(trace=false;timeout=1500):" +
"pbcast.NAKACK(trace=true):" +
"UNICAST(timeout=5000;min_wait_time=2000):" +
"FRAG:" +
"pbcast.GMS:" +
"pbcast.STATE_TRANSFER(trace=true):" +
"QUEUE";
protected HAPartitionImpl partition;
protected boolean deadlock_detection = false;
protected JChannel channel;
public String getName()
{
return partitionName;
}
public String getPartitionName()
{
return partitionName;
}
public void setPartitionName(String newName)
{
partitionName = newName;
}
public String getPartitionProperties()
{
return jgProps;
}
public void setPartitionProperties(String newProps)
{
jgProps = newProps;
}
public boolean getDeadlockDetection()
{
return deadlock_detection;
}
public void setDeadlockDetection(boolean doit)
{
deadlock_detection = doit;
}
public ObjectName getObjectName(MBeanServer server, ObjectName name)
throws javax.management.MalformedObjectNameException
{
return new ObjectName(OBJECT_NAME);
}
public void initService() throws Exception
{
log.info ("Creating JavaGroups JChannel");
this.channel = new JChannel(jgProps);
channel.SetOpt(Channel.LOCAL, new Boolean(false));
channel.SetOpt(Channel.GET_STATE_EVENTS, new Boolean(true));
channel.SetOpt(Channel.AUTO_RECONNECT, new Boolean(true));
channel.SetOpt(Channel.AUTO_GETSTATE, new Boolean(true));
log.info("Creating HAPartition...");
partition = new HAPartitionImpl(partitionName, channel, deadlock_detection);
log.info("...Initing HAPartition...");
partition.init();
log.info("...HAPartition initialized.");
}
public void startService() throws Exception
{
log.info("Starting ClusterPartition: " + partitionName);
log.info(" Connecting to channel");
channel.Connect(partitionName);
log.info(" Starting channel");
partition.start();
log.info("Started ClusterPartition: " + partitionName);
}
public void stopService()
{
try
{
log.info("Stopping HAPartition: " + partitionName);
partition.close();
log.info("Stopped HAPartition: " + partitionName);
}
catch (Exception e)
{
log.error("Exception during shutdown", e);
}
}
}
1.1
jbossmx/src/main/org/jboss/ha/framework/server/ClusterPartitionMBean.java
Index: ClusterPartitionMBean.java
===================================================================
package org.jboss.ha.framework.server;
public interface ClusterPartitionMBean
extends org.jboss.system.ServiceMBean
{
// Constants -----------------------------------------------------
public static final String OBJECT_NAME = ":service=HANaming";
public String getPartitionName();
public void setPartitionName(String newName);
public String getPartitionProperties(); // i.e. JavaGroups properties
public void setPartitionProperties(String newProps);
public boolean getDeadlockDetection();
public void setDeadlockDetection(boolean doit);
}
1.1
jbossmx/src/main/org/jboss/ha/framework/server/DistributedReplicantManagerImpl.java
Index: DistributedReplicantManagerImpl.java
===================================================================
package org.jboss.ha.framework.server;
import java.util.Vector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Collection;
import java.rmi.server.RemoteStub;
import java.rmi.server.RemoteServer;
import java.rmi.server.UnicastRemoteObject;
import java.rmi.RemoteException;
import java.io.Serializable;
import org.jboss.logging.Logger;
import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
import org.jboss.ha.framework.interfaces.HAPartition;
public class DistributedReplicantManagerImpl
implements DistributedReplicantManager,
HAPartition.HAPartitionStateTransfer,
HAPartition.HAMembershipListener
{
protected HashMap replicants = new HashMap ();
protected HashMap keyListeners = new HashMap ();
protected HAPartition partition;
protected final static String SERVICE_NAME = "DistributedReplicantManager";
protected Logger log = null;
public DistributedReplicantManagerImpl (HAPartition partition) throws
RemoteException
{
this.partition = partition;
this.log = Logger.create (partition.getPartitionName () + ":ReplicantManager");
}
public void init () throws Exception
{
// When we subscribe to state transfer events, GetState will be called to
initialize
// this service. (SL: THIS IS NOT THE CASE NOW!!!)
//
log.debug ("registerRPCHandler");
partition.registerRPCHandler (SERVICE_NAME, this);
log.debug ("registerMembershipListener");
partition.registerMembershipListener (this);
log.debug ("subscribeToStateTransferEvents");
partition.subscribeToStateTransferEvents (SERVICE_NAME, this);
}
public void start () throws Exception
{
log.info ("Export myself as an RMI server");
RemoteStub stub = UnicastRemoteObject.exportObject (this);
log.info ("Register myself as a replicant of DistributedReplicantManager");
add (SERVICE_NAME, stub);
}
public void stop () throws Exception
{
UnicastRemoteObject.unexportObject (this, true);
}
/////////////////////
// State transfer API
/////////////////////
public Serializable getCurrentState ()
{
return replicants;
}
public void setCurrentState (Serializable newState)
{
synchronized (this.replicants)
{
replicants.clear ();
replicants.putAll ((HashMap)newState);
if (keyListeners.size () > 0)
cleanupKeyListeners ();
}
}
////////////////////
// Group membership API
////////////////////
public void membershipChanged (Vector deadMembers, Vector newMembers, Vector
allMembers)
{
// Here we only care about deadMembers. Purge all replicant lists of
deadMembers
// and then notify all listening nodes.
//
try
{
synchronized (this.replicants)
{
Iterator keys = replicants.keySet ().iterator ();
while (keys.hasNext ())
{
String key = (String)keys.next ();
HashMap replicant = (HashMap)replicants.get (key);
boolean modified = false;
for (int i = 0; i < deadMembers.size (); i++)
{
String node = deadMembers.elementAt (i).toString ();
Object removed = replicant.remove (node);
if (removed != null) modified = true;
}
if (modified)
{
notifyKeyListeners (key, new ArrayList (replicant.values ()));
}
}
}
}
catch (Exception ex)
{
log.error ("membershipChanged failed", ex);
}
}
///////////////
// DistributedReplicantManager API
///////////////
public void _add (String key, String nodeName, Serializable replicant)
{
log.info ("_add(" + key + ", " + nodeName);
try
{
synchronized(this.replicants)
{
HashMap rep = (HashMap)replicants.get (key);
if (rep == null)
{
log.info ("_adding new HashMap");
rep = new HashMap ();
replicants.put (key, rep);
}
rep.put (nodeName, replicant);
log.info ("notifyingKeyListeners");
notifyKeyListeners (key, new ArrayList (rep.values ()));
}
}
catch (Exception ex)
{
log.error ("_add failed", ex);
}
}
public void add (String key, Serializable replicant) throws Exception
{
Object[] args =
{key, partition.getNodeName (), replicant};
partition.callMethodOnCluster (SERVICE_NAME, "_add", args, false);
}
public void _remove (String key, String nodeName)
{
try
{
synchronized(this.replicants)
{
HashMap replicant = (HashMap)replicants.get (key);
if (replicant == null) return;
Object removed = replicant.remove (nodeName);
if (removed != null)
{
Collection values = replicant.values ();
notifyKeyListeners (key, new ArrayList (values));
if (values.size () == 0)
{
replicants.remove (key);
}
}
}
}
catch (Exception ex)
{
log.error ("_remove failed", ex);
}
}
public void remove (String key) throws Exception
{
Object[] args =
{key, partition.getNodeName ()};
partition.callMethodOnCluster (SERVICE_NAME, "_remove", args, false);
}
public Serializable lookupLocalReplicant (String key) throws Exception
{
synchronized(this.replicants)
{
HashMap replicant = (HashMap)replicants.get (key);
if (replicant == null) return null;
return (Serializable)replicant.get (partition.getNodeName ());
}
}
public ArrayList lookupReplicants (String key) throws Exception
{
synchronized(this.replicants)
{
HashMap replicant = (HashMap)replicants.get (key);
if (replicant == null) return null;
return new ArrayList (replicant.values ());
}
}
public void registerListener (String key,
DistributedReplicantManager.ReplicantListener subscriber) throws Exception
{
synchronized(this.keyListeners)
{
ArrayList listeners = (ArrayList)keyListeners.get (key);
if (listeners == null)
{
listeners = new ArrayList ();
keyListeners.put (key, listeners);
}
listeners.add (subscriber);
}
}
public void unregisterListener (String key,
DistributedReplicantManager.ReplicantListener subscriber) throws Exception
{
synchronized(this.keyListeners)
{
ArrayList listeners = (ArrayList)keyListeners.get (key);
if (listeners == null) return;
listeners.remove (subscriber);
if (listeners.size () == 0)
keyListeners.remove (key);
}
}
protected void notifyKeyListeners (String key, ArrayList newReplicants)
{
log.info ("notifyKeyListeners");
synchronized(this.keyListeners)
{
ArrayList listeners = (ArrayList)keyListeners.get (key);
if (listeners == null)
{
log.debug ("listeners is null");
return;
}
log.debug ("notifying " + listeners.size () + " listeners for key change: "
+ key);
for (int i = 0; i < listeners.size (); i++)
{
DistributedReplicantManager.ReplicantListener listener =
(DistributedReplicantManager.ReplicantListener)listeners.get (i);
listener.replicantsChanged (key, newReplicants);
}
}
}
protected void cleanupKeyListeners ()
{
// NOT IMPLEMENTED YET
}
}
1.1
jbossmx/src/main/org/jboss/ha/framework/server/DistributedStateImpl.java
Index: DistributedStateImpl.java
===================================================================
package org.jboss.ha.framework.server;
import java.util.Vector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Collection;
import java.rmi.server.RemoteStub;
import java.rmi.Remote;
import java.rmi.server.UnicastRemoteObject;
import java.io.Serializable;
import org.jboss.logging.Logger;
import org.jboss.ha.framework.interfaces.DistributedState;
import org.jboss.ha.framework.interfaces.HAPartition;
public class DistributedStateImpl
implements Remote, DistributedState,
HAPartition.HAPartitionStateTransfer,
HAPartition.HAMembershipListener
{
protected HashMap categories = new HashMap ();
protected HashMap keyListeners = new HashMap ();
protected HAPartition partition;
protected final static String SERVICE_NAME = "DistributedState";
protected Logger log = null;
public DistributedStateImpl (HAPartition partition)
{
this.partition = partition;
this.log = Logger.create (this.getClass ());
}
public void init () throws Exception
{
// When we subscribe to state transfer events, GetState will be called to
initialize
// this service.
//
partition.subscribeToStateTransferEvents (SERVICE_NAME, this);
partition.registerMembershipListener (this);
partition.registerRPCHandler (SERVICE_NAME, this);
}
public void start () throws Exception
{
// Export myself as an RMI server
//
RemoteStub stub = UnicastRemoteObject.exportObject (this);
// bind myself as a replicant service
//
partition.getDistributedReplicantManager ().add (SERVICE_NAME, stub);
}
public void stop () throws Exception
{
//partition.getDistributedReplicantManager ().remove (SERVICE_NAME);
UnicastRemoteObject.unexportObject (this, true);
}
/////////////////////
// State transfer API
/////////////////////
public Serializable getCurrentState ()
{
return categories;
}
public void setCurrentState (Serializable newState)
{
synchronized (this.categories)
{
categories.clear ();
categories.putAll ((HashMap)newState);
if (keyListeners.size () > 0)
{
cleanupKeyListeners ();
}
}
}
////////////////////
// Group membership API
////////////////////
public void membershipChanged (Vector deadMembers, Vector newMembers, Vector
allMembers)
{
// If membership changes, this has no effect on the distributed state
//
}
///////////////
// DistributedReplicantManager API
///////////////
public void _set (String category, String key, Serializable value) throws
Exception
{
synchronized(this.categories)
{
HashMap cat = (HashMap)categories.get (category);
if (cat == null)
{
cat = new HashMap ();
categories.put (category, cat);
}
cat.put (key, value);
notifyKeyListeners (category, key, value);
}
}
public void set (String category, String key, Serializable value) throws Exception
{
Object[] args =
{category, key, value};
partition.callMethodOnCluster (SERVICE_NAME, "_set", args, false);
}
public void _remove (String category, String key) throws Exception
{
synchronized(this.categories)
{
HashMap cat = (HashMap)categories.get (category);
if (cat == null) return;
Object removed = cat.remove (key);
if (removed != null)
{
notifyKeyListenersOfRemove (category, key, (Serializable)removed);
if (cat.size () == 0)
{
categories.remove (category);
}
}
}
}
public void remove (String category, String key) throws Exception
{
Object[] args =
{category, key};
partition.callMethodOnCluster (SERVICE_NAME, "_remove", args, false);
}
public Serializable get (String category, String key)
{
synchronized(this.categories)
{
HashMap cat = (HashMap)categories.get (category);
if (cat == null) return null;
return (Serializable)cat.get (key);
}
}
public Collection getAllCategories ()
{
synchronized(this.categories)
{
return categories.keySet ();
}
}
public Collection getAllKeys (String category)
{
synchronized(this.categories)
{
HashMap cat = (HashMap)categories.get (category);
if (cat == null) return null;
return cat.keySet ();
}
}
public Collection getAllValues (String category)
{
synchronized(this.categories)
{
HashMap cat = (HashMap)categories.get (category);
if (cat == null) return null;
return cat.values ();
}
}
public void registerDSListener (String category, DistributedState.DSListener
subscriber)
{
synchronized(this.keyListeners)
{
ArrayList listeners = (ArrayList)keyListeners.get (category);
if (listeners == null)
{
listeners = new ArrayList ();
keyListeners.put (category, listeners);
}
listeners.add (subscriber);
}
}
public void unregisterDSListener (String category, DistributedState.DSListener
subscriber)
{
synchronized(this.keyListeners)
{
ArrayList listeners = (ArrayList)keyListeners.get (category);
if (listeners == null) return;
listeners.remove (subscriber);
if (listeners.size () == 0)
{
keyListeners.remove (category);
}
}
}
protected void notifyKeyListeners (String category, String key, Serializable
value)
{
synchronized(this.keyListeners)
{
ArrayList listeners = (ArrayList)keyListeners.get (category);
if (listeners == null) return;
for (int i = 0; i < listeners.size (); i++)
{
DistributedState.DSListener listener =
(DistributedState.DSListener)listeners.get (i);
listener.valueHasChanged (category, key, value);
}
}
}
protected void notifyKeyListenersOfRemove (String category, String key,
Serializable oldContent)
{
synchronized(this.keyListeners)
{
ArrayList listeners = (ArrayList)keyListeners.get (category);
if (listeners == null) return;
for (int i = 0; i < listeners.size (); i++)
{
DistributedState.DSListener listener =
(DistributedState.DSListener)listeners.get (i);
listener.keyHasBeenRemoved (category, key, oldContent);
}
}
}
protected void cleanupKeyListeners ()
{
// NOT IMPLEMENTED YET
}
}
1.1
jbossmx/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java
Index: HAPartitionImpl.java
===================================================================
/*
* JBoss, the OpenSource J2EE WebOS
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.ha.framework.server;
import JavaGroups.MethodLookup;
import JavaGroups.MethodLookupClos;
import JavaGroups.MethodLookupJava;
import JavaGroups.MembershipListener;
import JavaGroups.MessageListener;
import JavaGroups.JChannel;
import JavaGroups.Channel;
import JavaGroups.Message;
import JavaGroups.MethodCall;
import JavaGroups.RpcDispatcher;
import JavaGroups.RspList;
import JavaGroups.Rsp;
import JavaGroups.GroupRequest;
import JavaGroups.View;
import JavaGroups.Address;
import JavaGroups.Util;
import java.util.Vector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.io.Serializable;
import javax.naming.InitialContext;
import javax.naming.Reference;
import javax.naming.StringRefAddr;
import javax.naming.Name;
import javax.naming.Context;
import javax.naming.NamingException;
import javax.naming.NameNotFoundException;
import org.jboss.naming.NonSerializableFactory;
import java.lang.reflect.Method;
import org.jboss.logging.Logger;
import org.jboss.ha.framework.interfaces.HAPartition;
import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
import org.jboss.ha.framework.interfaces.DistributedState;
public class HAPartitionImpl extends RpcDispatcher implements HAPartition,
MembershipListener, MessageListener
{
protected HashMap rpcHandlers = new HashMap ();
protected HashMap stateHandlers = new HashMap ();
protected ArrayList listeners = new ArrayList ();
protected Vector members = null;
protected String partitionName;
protected String nodeName;
protected int timeout = 0;
protected JChannel channel = null;
protected DistributedReplicantManagerImpl replicantManager = null;
protected DistributedStateImpl dsManager = null;
protected Logger log = null;
final MethodLookup method_lookup_clos=new MethodLookupClos ();
public HAPartitionImpl (String partitionName, JChannel channel, boolean
deadlock_detection) throws Exception
{
super(channel, null, null, new Object (), false); // init RpcDispatcher with a
fake target object
this.log = Logger.create ("HAPartition:" + partitionName);
this.channel = channel;
this.partitionName = partitionName;
}
public void init () throws Exception
{
// Subscribe to dHA events comming generated by the JavaGroups protocol stack
//
log.debug ("SetMembershipListener");
SetMembershipListener (this);
log.debug ("SetMessageListener");
SetMessageListener (this);
// Create the DRM and link it to this HAPartition
//
log.debug ("create replicant manager");
this.replicantManager = new DistributedReplicantManagerImpl (this);
log.debug ("init replicant manager");
this.replicantManager.init ();
log.debug ("bind replicant manager");
// Create the DS and link it to this HAPartition
//
log.debug ("create distributed state");
this.dsManager = new DistributedStateImpl (this);
log.debug ("init distributed state service");
this.dsManager.init ();
log.debug ("bind distributed state service");
// Bind ourself in the public JNDI space
//
Context ctx = new InitialContext ();
this.bind ("/HAPartition/" + partitionName, this, HAPartitionImpl.class, ctx);
log.info ("done initing..");
}
protected void bind (String jndiName, Object who, Class classType, Context ctx)
throws Exception
{
// Ah ! This service isn't serializable, so we use a helper class
//
NonSerializableFactory.bind (jndiName, who);
Name n = ctx.getNameParser ("").parse (jndiName);
while (n.size () > 1)
{
String ctxName = n.get (0);
try
{
ctx = (Context)ctx.lookup (ctxName);
}
catch (NameNotFoundException e)
{
log.info ("creating Subcontext" + ctxName);
ctx = ctx.createSubcontext (ctxName);
}
n = n.getSuffix (1);
}
// The helper class NonSerializableFactory uses address type nns, we go on to
// use the helper class to bind the service object in JNDI
//
StringRefAddr addr = new StringRefAddr ("nns", jndiName);
Reference ref = new Reference ( classType.getName (), addr,
NonSerializableFactory.class.getName (), null);
ctx.bind (n.get (0), ref);
}
public void start () throws Exception
{
// get current JG group properties
//
log.debug ("get nodeName");
this.nodeName = channel.GetLocalAddress ().toString ();
log.debug ("Get current members");
View view = channel.GetView ();
this.members = view.GetMembers ();
log.info ("Num cluster members: " + members.size ());
// We must now syncrhonize new state transfer subscriber
//
boolean rc = channel.GetState (null, 8000);
if (rc)
log.info ("State was retrieved successfully");
else
log.info ("State could not be retrieved, (must be first member of group)");
// We start now able to start our DRM and DS
//
this.replicantManager.start ();
this.dsManager.start ();
}
public void close () throws Exception
{
// Stop the DRM and DS services
//
try {this.replicantManager.stop ();} catch (Exception printed)
{printed.printStackTrace ();}
try {this.dsManager.stop ();} catch (Exception printed)
{printed.printStackTrace ();}
try {channel.Close ();} catch (Exception printed) {printed.printStackTrace ();}
new InitialContext ().unbind ("/HAPartition/" + partitionName);
}
public String getNodeName ()
{
return nodeName;
}
public String getPartitionName ()
{
return partitionName;
}
public DistributedReplicantManager getDistributedReplicantManager ()
{
return replicantManager;
}
public DistributedState getDistributedStateService ()
{
return this.dsManager;
}
// ***************************
// ***************************
// RPC multicast communication
// ***************************
// ***************************
//
public void registerRPCHandler (String objName, Object subscriber)
{
rpcHandlers.put (objName, subscriber);
}
public void unregisterRPCHandler (String objName, Object subscriber)
{
rpcHandlers.remove (objName);
}
/**
* This function is an abstraction of RpcDispatcher.
*/
public ArrayList callMethodOnCluster (String objName, String methodName, Object[]
args, boolean excludeSelf) throws Exception
{
ArrayList rtn = new ArrayList ();
MethodCall m = new MethodCall (objName + "." + methodName, args);
RspList rsp = this.CallRemoteMethods (null, m, GroupRequest.GET_ALL, timeout);
if (rsp != null)
{
for (int i = 0; i < rsp.size (); i++)
{
Object item = rsp.elementAt (i);
if (item instanceof Rsp)
{
item = ((Rsp)item).GetValue ();
}
rtn.add (item);
}
}
if (!excludeSelf)
{
m.SetName (methodName);
Object handler = rpcHandlers.get (objName);
Object retval=m.Invoke (handler, method_lookup_clos);
if (retval != null) rtn.add (retval);
}
return rtn;
}
// *************************
// *************************
// State transfer management
// *************************
// *************************
//
public void subscribeToStateTransferEvents (String objectName,
HAPartition.HAPartitionStateTransfer subscriber)
{
stateHandlers.put (objectName, subscriber);
}
public void unsubscribeFromStateTransferEvents (String objectName,
HAPartition.HAPartitionStateTransfer subscriber)
{
stateHandlers.remove (objectName);
}
// MessageListener methods
//
public Object GetState ()
{
log.info ("GetState called.");
try
{
// we now get the sub-state of each HAPartitionStateTransfer subscribers and
// build a "macro" state
//
HashMap state = new HashMap ();
Iterator keys = stateHandlers.keySet ().iterator ();
while (keys.hasNext ())
{
String key = (String)keys.next ();
HAPartition.HAPartitionStateTransfer subscriber =
(HAPartition.HAPartitionStateTransfer)stateHandlers.get (key);
log.debug ("GetState for " + key);
state.put (key, subscriber.getCurrentState ());
}
return state;
}
catch (Exception ex)
{
log.error ("GetState failed", ex);
}
return null;
}
public void SetState (Object obj)
{
try
{
log.info ("SetState called");
if (obj == null)
{
log.info ("state is null");
return;
}
HashMap state = (HashMap)obj;
Iterator keys = state.keySet ().iterator ();
while (keys.hasNext ())
{
String key = (String)keys.next ();
log.info ("SetState for " + key);
Object someState = state.get (key);
HAPartition.HAPartitionStateTransfer subscriber =
(HAPartition.HAPartitionStateTransfer)stateHandlers.get (key);
if (subscriber != null)
{
subscriber.setCurrentState ((Serializable)someState);
}
else
{
log.info ("There is no stateHandler for: " + key);
}
}
}
catch (Exception ex)
{
log.error ("SetState failed", ex);
}
}
public void Receive (Message msg)
{ /* complete */}
// *************************
// *************************
// Group Membership listeners
// *************************
// *************************
//
public void registerMembershipListener (HAPartition.HAMembershipListener listener)
{
synchronized(this.listeners)
{
this.listeners.add (listener);
}
}
public void unregisterMembershipListener (HAPartition.HAMembershipListener
listener)
{
synchronized(this.listeners)
{
this.listeners.remove (listener);
}
}
// MembershipListener required methods
public void Suspect (Address suspected_mbr) { log.info ("Suspected member: " +
suspected_mbr); /* complete */ }
public void Block () {}
public void ViewAccepted (View newView)
{
try
{
if (this.members == null)
{
// Initial viewAccepted
//
this.members = newView.GetMembers ();
return;
}
Vector oldMembers = this.members;
Vector allMembers = newView.GetMembers ();
Vector deadMembers = getDeadMembers (oldMembers, allMembers);
Vector newMembers = getNewMembers (oldMembers, allMembers);
this.members = allMembers;
log.info ("membership changed from " + oldMembers.size () + " to " +
allMembers.size ());
// Broadcast the new view to the view change listeners
//
synchronized(this.listeners)
{
for (int i = 0; i < listeners.size (); i++)
{
try
{
((HAPartition.HAMembershipListener)listeners.get
(i)).membershipChanged (deadMembers, newMembers, allMembers);
}
catch (Exception printed)
{
// a problem in a listener should not prevent other members to
receive the new view
//
printed.printStackTrace ();
}
}
}
}
catch (Exception ex)
{
log.error ("ViewAccepted failed", ex);
}
}
protected Vector getDeadMembers (Vector oldMembers, Vector newMembers)
{
Vector dead = new Vector ();
for (int i=0; i<oldMembers.size ();i++)
if (!newMembers.contains (oldMembers.elementAt (i)))
dead.add (oldMembers.elementAt (i));
return dead;
}
protected Vector getNewMembers (Vector oldMembers, Vector allMembers)
{
Vector newMembers = new Vector ();
for (int i=0; i<allMembers.size ();i++)
if (!oldMembers.contains (allMembers.elementAt (i)))
newMembers.add (allMembers.elementAt (i));
return newMembers;
}
///////////////////////////////////////////////////////////////
/**
* Message contains MethodCall. Execute it against *this* object and return
result.
* Use MethodCall.Invoke() to do this. Return result.
*
* This overrides RpcDispatcher.Handle so that we can dispatch to many different
objects.
*/
public Object Handle (Message req)
{
Object body = null;
Object retval = null;
MethodCall method_call = null;
if(req == null || req.GetBuffer () == null)
{
log.warn ("RpcProtocol.Handle(): message or message buffer is null !");
return null;
}
try
{
body=Util.ObjectFromByteBuffer (req.GetBuffer ());
}
catch(Exception e)
{
log.warn ("RpcProtocol.Handle(): " + e);
return null;
}
if(body == null || !(body instanceof MethodCall))
{
log.warn ("RpcProtocol.Handle(): message does not contain a MethodCall
object !");
return null;
}
// get method call informations
//
method_call=(MethodCall)body;
String methodName = method_call.GetName ();
int idx = methodName.indexOf ('.');
String handlerName = methodName.substring (0, idx);
String newMethodName = methodName.substring (idx + 1);
log.info ("Handle: " + methodName);
// prepare method call
//
method_call.SetName (newMethodName);
Object handler = rpcHandlers.get (handlerName);
// Invoke it
//
retval=method_call.Invoke (handler, method_lookup_clos);
return retval;
}
}
1.1
jbossmx/src/main/org/jboss/ha/framework/server/HARMIServerImpl.java
Index: HARMIServerImpl.java
===================================================================
package org.jboss.ha.framework.server;
import java.util.Vector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Collection;
import java.rmi.Remote;
import java.rmi.server.RemoteStub;
import java.rmi.server.RemoteServer;
import java.rmi.server.UnicastRemoteObject;
import java.io.Serializable;
import java.io.Externalizable;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.IOException;
import java.rmi.RemoteException;
import org.jboss.logging.Logger;
import org.jboss.ejb.plugins.jrmp.interfaces.RemoteMethodInvocation;
import java.rmi.MarshalledObject;
import java.lang.reflect.Method;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import org.jboss.ha.framework.interfaces.HAPartition;
import org.jboss.ha.framework.interfaces.HARMIServer;
import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
import org.jboss.ha.framework.interfaces.LoadBalancePolicy;
import org.jboss.ha.framework.interfaces.HARMIResponse;
import org.jboss.ha.framework.interfaces.HARMIClient;
import org.jboss.ha.framework.interfaces.HARMIProxy;
/**
*
*/
public class HARMIServerImpl
implements HARMIServer,
DistributedReplicantManager.ReplicantListener
{
protected String replicantName;
protected long lastSet = System.currentTimeMillis ();
protected ArrayList replicants = new ArrayList ();
protected Object handler;
protected HashMap invokerMap = new HashMap ();
protected HAPartition partition = null;
protected Logger log;
protected RemoteStub rmistub;
protected Object stub;
protected String key;
protected Class intf;
public HARMIServerImpl (HAPartition partition, String replicantName, Class intf,
Object handler) throws Exception
{
this.replicantName = replicantName;
this.partition = partition;
this.handler = handler;
this.log = Logger.create (this.getClass ());
this.intf = intf;
Method[] methods = handler.getClass ().getMethods ();
for (int i = 0; i < methods.length; i++)
invokerMap.put (new Long (RemoteMethodInvocation.calculateHash
(methods[i])), methods[i]);
this.rmistub = UnicastRemoteObject.exportObject (this);
this.key = partition.getPartitionName () + "/" + replicantName;
partition.getDistributedReplicantManager ().registerListener (replicantName,
this);
partition.getDistributedReplicantManager ().add (replicantName, rmistub);
HARMIServer.rmiServers.put (key, this);
}
public ArrayList getReplicants () throws Exception
{
return replicants;
}
public Object createHAStub (LoadBalancePolicy policy)
{
HARMIClient client = new HARMIClient (replicants, policy, key, handler);
return Proxy.newProxyInstance (
intf.getClassLoader (),
new Class[]
{ intf, HARMIProxy.class },
client);
}
public void destroy ()
{
try
{
this.partition.getDistributedReplicantManager ().unregisterListener
(this.replicantName, this);
this.partition.getDistributedReplicantManager ().remove
(this.replicantName);
HARMIServer.rmiServers.remove (key);
UnicastRemoteObject.unexportObject (this, true);
} catch (Exception e)
{
e.printStackTrace ();
}
}
public long getTag ()
{
return lastSet;
}
public Object getLocal () throws Exception
{
return handler;
}
public void replicantsChanged (String key, ArrayList newReplicants)
{
log.info ("replicantsChanged" + key + " to " + newReplicants.size ());
synchronized(replicants)
{
// client has reference to replicants so it will automatically get
// updated
replicants.clear ();
replicants.addAll (newReplicants);
}
lastSet = System.currentTimeMillis ();
}
public HARMIResponse invoke (long tag, MarshalledObject mimo) throws Exception
{
RemoteMethodInvocation rmi = (RemoteMethodInvocation)mimo.get ();
rmi.setMethodMap (invokerMap);
Method method = rmi.getMethod ();
try
{
HARMIResponse rsp = new HARMIResponse ();
if (tag < lastSet)
{
rsp.newReplicants = new ArrayList (replicants);
rsp.tag = lastSet;
}
rsp.response = method.invoke (handler, rmi.getArguments ());
return rsp;
}
catch (IllegalAccessException iae)
{
throw iae;
}
catch (IllegalArgumentException iae)
{
throw iae;
}
catch (InvocationTargetException ite)
{
throw (Exception)ite.getTargetException ();
}
}
}
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development