User: slaboure
  Date: 01/09/30 08:20:59

  Added:       src/main/org/jboss/ha/framework/server ClusterPartition.java
                        ClusterPartitionMBean.java
                        DistributedReplicantManagerImpl.java
                        DistributedStateImpl.java HAPartitionImpl.java
                        HARMIServerImpl.java
  Log:
  moved client side files of the HA framework
   from org.jboss.ha to org.jboss.ha.framework.interfaces.
  small code modifs.
  suppressed CR/LF issues.
  
  Revision  Changes    Path
  1.1                  
jbossmx/src/main/org/jboss/ha/framework/server/ClusterPartition.java
  
  Index: ClusterPartition.java
  ===================================================================
  package org.jboss.ha.framework.server;
  
  
  import JavaGroups.JChannel;
  
  import JavaGroups.Channel;
  
  import org.jboss.logging.Logger;
  import org.jboss.system.ServiceMBeanSupport;
  
  import javax.management.MBeanServer;
  import javax.management.ObjectName;
  import javax.management.MalformedObjectNameException;
  
  import org.apache.log4j.Category;
  
  
  public class ClusterPartition extends ServiceMBeanSupport implements 
ClusterPartitionMBean
  {
     protected String partitionName = "DefaultPartition";
     protected String jgProps = 
        "UDP:" +
        "PING:" +
        "FD(trace=true;timeout=5000):" +
        "VERIFY_SUSPECT(trace=false;timeout=1500):" +
        "pbcast.NAKACK(trace=true):" +
        "UNICAST(timeout=5000;min_wait_time=2000):" +
        "FRAG:" +
        "pbcast.GMS:" +
        "pbcast.STATE_TRANSFER(trace=true):" +
        "QUEUE";
     protected HAPartitionImpl partition;
     protected boolean deadlock_detection = false;
     protected JChannel channel;
     
     public String getName()
     {
        return partitionName;
     }
  
     public String getPartitionName()
     {
        return partitionName;
     }
  
     public void setPartitionName(String newName)
     {
        partitionName = newName;
     }
  
     public String getPartitionProperties()
     {
        return jgProps;
     }
  
     public void setPartitionProperties(String newProps)
     {
        jgProps = newProps;
     }
  
     public boolean getDeadlockDetection()
     {
        return deadlock_detection;
     }
  
     public void setDeadlockDetection(boolean doit)
     {
        deadlock_detection = doit;
     }
  
     public ObjectName getObjectName(MBeanServer server, ObjectName name)
        throws javax.management.MalformedObjectNameException
     {
        return new ObjectName(OBJECT_NAME);
     }
     
     public void initService() throws Exception
     {
        log.info ("Creating JavaGroups JChannel");
        this.channel = new JChannel(jgProps);
        channel.SetOpt(Channel.LOCAL, new Boolean(false));
        channel.SetOpt(Channel.GET_STATE_EVENTS, new Boolean(true));        
        channel.SetOpt(Channel.AUTO_RECONNECT, new Boolean(true));
        channel.SetOpt(Channel.AUTO_GETSTATE, new Boolean(true));
        
        log.info("Creating HAPartition...");
        partition = new HAPartitionImpl(partitionName, channel, deadlock_detection);
        
        log.info("...Initing HAPartition...");
        partition.init();
        
        log.info("...HAPartition initialized.");
     }
  
     public void startService() throws Exception
     {
        log.info("Starting ClusterPartition: " + partitionName);
        
        log.info(" Connecting to channel");
        channel.Connect(partitionName);
        
        log.info(" Starting channel");
        partition.start();
        
        log.info("Started ClusterPartition: " + partitionName);
     }
     
     public void stopService()
     {
        try
        {
           log.info("Stopping HAPartition: " + partitionName);
           partition.close();
           log.info("Stopped HAPartition: " + partitionName);
        }
        catch (Exception e)
        {
           log.error("Exception during shutdown", e);
        }
     }
  }
  
  
  1.1                  
jbossmx/src/main/org/jboss/ha/framework/server/ClusterPartitionMBean.java
  
  Index: ClusterPartitionMBean.java
  ===================================================================
  package org.jboss.ha.framework.server;
  
  
  public interface ClusterPartitionMBean
     extends org.jboss.system.ServiceMBean
  {
     // Constants -----------------------------------------------------
     public static final String OBJECT_NAME = ":service=HANaming";
  
     public String getPartitionName();
     public void setPartitionName(String newName);
  
     public String getPartitionProperties(); // i.e. JavaGroups properties
     public void setPartitionProperties(String newProps);
  
     public boolean getDeadlockDetection();
     public void setDeadlockDetection(boolean doit);
  }
  
  
  
      
  
      
  
  
  
  
  1.1                  
jbossmx/src/main/org/jboss/ha/framework/server/DistributedReplicantManagerImpl.java
  
  Index: DistributedReplicantManagerImpl.java
  ===================================================================
  package org.jboss.ha.framework.server;
  
  
  import java.util.Vector;
  import java.util.ArrayList;
  import java.util.HashMap;
  import java.util.Iterator;
  import java.util.Collection;
  
  import java.rmi.server.RemoteStub;
  import java.rmi.server.RemoteServer;
  import java.rmi.server.UnicastRemoteObject;
  import java.rmi.RemoteException;
  
  import java.io.Serializable;
  
  import org.jboss.logging.Logger;
  
  import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
  import org.jboss.ha.framework.interfaces.HAPartition;
  
  
  public class DistributedReplicantManagerImpl
  implements DistributedReplicantManager,
  HAPartition.HAPartitionStateTransfer,
  HAPartition.HAMembershipListener
  {
     protected HashMap replicants = new HashMap ();
     protected HashMap keyListeners = new HashMap ();
     protected HAPartition partition;
     
     protected final static String SERVICE_NAME = "DistributedReplicantManager";
     
     protected Logger log = null;
     
     
     public DistributedReplicantManagerImpl (HAPartition partition) throws 
RemoteException
     {
        this.partition = partition;
        this.log = Logger.create (partition.getPartitionName () + ":ReplicantManager");
     }
     
     public void init () throws Exception
     {
        // When we subscribe to state transfer events, GetState will be called to 
initialize
        // this service. (SL: THIS IS NOT THE CASE NOW!!!)
        //
        log.debug ("registerRPCHandler");
        partition.registerRPCHandler (SERVICE_NAME, this);
        log.debug ("registerMembershipListener");
        partition.registerMembershipListener (this);
        log.debug ("subscribeToStateTransferEvents");
        partition.subscribeToStateTransferEvents (SERVICE_NAME, this);
     }
     
     public void start () throws Exception
     {
        log.info ("Export myself as an RMI server");
        RemoteStub stub = UnicastRemoteObject.exportObject (this);
        log.info ("Register myself as a replicant of DistributedReplicantManager");
        add (SERVICE_NAME, stub);
     }
     
     public void stop () throws Exception
     {
        UnicastRemoteObject.unexportObject (this, true);
     }
     
     /////////////////////
     // State transfer API
     /////////////////////
     public Serializable getCurrentState ()
     {
        return replicants;
     }
     
     public void setCurrentState (Serializable newState)
     {
        synchronized (this.replicants)
        {
           replicants.clear ();
           replicants.putAll ((HashMap)newState);
           if (keyListeners.size () > 0)
              cleanupKeyListeners ();
        }
     }
     
     ////////////////////
     // Group membership API
     ////////////////////
     public void membershipChanged (Vector deadMembers, Vector newMembers, Vector 
allMembers)
     {
        // Here we only care about deadMembers.  Purge all replicant lists of 
deadMembers
        // and then notify all listening nodes.
        //
        try
        {
           synchronized (this.replicants)
           {
              Iterator keys = replicants.keySet ().iterator ();
              while (keys.hasNext ())
              {
                 String key = (String)keys.next ();
                 HashMap replicant = (HashMap)replicants.get (key);
                 boolean modified = false;
                 for (int i = 0; i < deadMembers.size (); i++)
                 {
                    String node = deadMembers.elementAt (i).toString ();
                    Object removed = replicant.remove (node);
                    if (removed != null) modified = true;
                 }
                 if (modified)
                 {
                    notifyKeyListeners (key, new ArrayList (replicant.values ()));
                 }
              }
           }
        }
        catch (Exception ex)
        {
           log.error ("membershipChanged failed", ex);
        }
     }
     
     ///////////////
     // DistributedReplicantManager API
     ///////////////
     
     public void _add (String key, String nodeName, Serializable replicant)
     {
        log.info ("_add(" + key + ", " + nodeName);
        
        try
        {
           synchronized(this.replicants)
           {
              HashMap rep = (HashMap)replicants.get (key);
              if (rep == null)
              {
                 log.info ("_adding new HashMap");
                 rep = new HashMap ();
                 replicants.put (key, rep);
              }
              rep.put (nodeName, replicant);
              log.info ("notifyingKeyListeners");
              notifyKeyListeners (key, new ArrayList (rep.values ()));
           }
        }
        catch (Exception ex)
        {
           log.error ("_add failed", ex);
        }
     }
     
     public void add (String key, Serializable replicant) throws Exception
     {
        Object[] args =
        {key, partition.getNodeName (), replicant};
        partition.callMethodOnCluster (SERVICE_NAME, "_add", args, false);
     }
     
     public void _remove (String key, String nodeName)
     {
        try
        {
           synchronized(this.replicants)
           {
              HashMap replicant = (HashMap)replicants.get (key);
              if (replicant == null) return;
              Object removed = replicant.remove (nodeName);
              if (removed != null)
              {
                 Collection values = replicant.values ();
                 notifyKeyListeners (key, new ArrayList (values));
                 if (values.size () == 0)
                 {
                    replicants.remove (key);
                 }
              }
           }
        }
        catch (Exception ex)
        {
           log.error ("_remove failed", ex);
        }
     }
     
     public void remove (String key) throws Exception
     {
        Object[] args =
        {key, partition.getNodeName ()};
        partition.callMethodOnCluster (SERVICE_NAME, "_remove", args, false);
        
     }
     
     public Serializable lookupLocalReplicant (String key) throws Exception
     {
        synchronized(this.replicants)
        {
           HashMap replicant = (HashMap)replicants.get (key);
           if (replicant == null) return null;
           
           return (Serializable)replicant.get (partition.getNodeName ());
        }
     }
     
     public ArrayList lookupReplicants (String key) throws Exception
     {
        synchronized(this.replicants)
        {
           HashMap replicant = (HashMap)replicants.get (key);
           if (replicant == null) return null;
           
           return new ArrayList (replicant.values ());
        }
     }
     
     
     public void registerListener (String key, 
DistributedReplicantManager.ReplicantListener subscriber) throws Exception
     {
        synchronized(this.keyListeners)
        {
           ArrayList listeners = (ArrayList)keyListeners.get (key);
           if (listeners == null)
           {
              listeners = new ArrayList ();
              keyListeners.put (key, listeners);
           }
           listeners.add (subscriber);
        }
     }
     
     public void unregisterListener (String key, 
DistributedReplicantManager.ReplicantListener subscriber) throws Exception
     {
        synchronized(this.keyListeners)
        {
           ArrayList listeners = (ArrayList)keyListeners.get (key);
           if (listeners == null) return;
           
           listeners.remove (subscriber);
           if (listeners.size () == 0)
              keyListeners.remove (key);
        }
     }
     
     protected void notifyKeyListeners (String key, ArrayList newReplicants)
     {
        log.info ("notifyKeyListeners");
        
        synchronized(this.keyListeners)
        {
           ArrayList listeners = (ArrayList)keyListeners.get (key);
           if (listeners == null)
           {
              log.debug ("listeners is null");
              return;
           }
           
           log.debug ("notifying " + listeners.size () + " listeners for key change: " 
+ key);
           for (int i = 0; i < listeners.size (); i++)
           {
              DistributedReplicantManager.ReplicantListener listener = 
(DistributedReplicantManager.ReplicantListener)listeners.get (i);
              listener.replicantsChanged (key, newReplicants);
           }
        }
     }
     
     protected void cleanupKeyListeners ()
     {
        // NOT IMPLEMENTED YET
     }
  }
  
  
  1.1                  
jbossmx/src/main/org/jboss/ha/framework/server/DistributedStateImpl.java
  
  Index: DistributedStateImpl.java
  ===================================================================
  package org.jboss.ha.framework.server;
  
  
  import java.util.Vector;
  import java.util.ArrayList;
  import java.util.HashMap;
  import java.util.Iterator;
  import java.util.Collection;
  
  import java.rmi.server.RemoteStub;
  import java.rmi.Remote;
  import java.rmi.server.UnicastRemoteObject;
  
  import java.io.Serializable;
  
  import org.jboss.logging.Logger;
  
  import org.jboss.ha.framework.interfaces.DistributedState;
  import org.jboss.ha.framework.interfaces.HAPartition;
  
  public class DistributedStateImpl 
  implements Remote, DistributedState,
  HAPartition.HAPartitionStateTransfer,
  HAPartition.HAMembershipListener
  {
     protected HashMap categories = new HashMap ();
     protected HashMap keyListeners = new HashMap ();
     protected HAPartition partition;
     
     protected final static String SERVICE_NAME = "DistributedState";
     
     protected Logger log = null;
     
     
     public DistributedStateImpl (HAPartition partition)
     {
        this.partition = partition;
        this.log = Logger.create (this.getClass ());
     }
     
     public void init () throws Exception
     {
        // When we subscribe to state transfer events, GetState will be called to 
initialize
        // this service.
        //
        partition.subscribeToStateTransferEvents (SERVICE_NAME, this);
        partition.registerMembershipListener (this);
        partition.registerRPCHandler (SERVICE_NAME, this);
        
     }
     
     public void start () throws Exception
     {
        // Export myself as an RMI server
        //
        RemoteStub stub = UnicastRemoteObject.exportObject (this);
        
        // bind myself as a replicant service
        //
        partition.getDistributedReplicantManager ().add (SERVICE_NAME, stub);
     }
     
     public void stop () throws Exception
     {
        //partition.getDistributedReplicantManager ().remove (SERVICE_NAME); 
        UnicastRemoteObject.unexportObject (this, true);
     }
     
     /////////////////////
     // State transfer API
     /////////////////////
     public Serializable getCurrentState ()
     {
        return categories;
     }
     
     public void setCurrentState (Serializable newState)
     {
        synchronized (this.categories)
        {
           categories.clear ();
           categories.putAll ((HashMap)newState);
           if (keyListeners.size () > 0)
           {
              cleanupKeyListeners ();
           }
        }
     }
     
     ////////////////////
     // Group membership API
     ////////////////////
     public void membershipChanged (Vector deadMembers, Vector newMembers, Vector 
allMembers)
     {
        // If membership changes, this has no effect on the distributed state
        //
     }
     
     ///////////////
     // DistributedReplicantManager API
     ///////////////
     
     public void _set (String category, String key, Serializable value) throws 
Exception
     {
        synchronized(this.categories)
        {
           HashMap cat = (HashMap)categories.get (category);
           if (cat == null)
           {
              cat = new HashMap ();
              categories.put (category, cat);
           }
           cat.put (key, value);
           notifyKeyListeners (category, key, value);
        }
     }
     
     public void set (String category, String key, Serializable value) throws Exception
     {
        Object[] args =
        {category, key, value};
        partition.callMethodOnCluster (SERVICE_NAME, "_set", args, false);
     }
     
     public void _remove (String category, String key) throws Exception
     {
        synchronized(this.categories)
        {
           HashMap cat = (HashMap)categories.get (category);
           if (cat == null) return;
           Object removed = cat.remove (key);
           if (removed != null)
           {
              notifyKeyListenersOfRemove (category, key, (Serializable)removed);
              if (cat.size () == 0)
              {
                 categories.remove (category);
              }
           }
        }
     }
     
     
     public void remove (String category, String key) throws Exception
     {
        Object[] args =
        {category, key};
        partition.callMethodOnCluster (SERVICE_NAME, "_remove", args, false);
        
     }
     
     public Serializable get (String category, String key)
     {
        synchronized(this.categories)
        {
           HashMap cat = (HashMap)categories.get (category);
           if (cat == null) return null;
           
           return (Serializable)cat.get (key);
        }
     }
     
     public Collection getAllCategories ()
     {
        synchronized(this.categories)
        {
           return categories.keySet ();
        }
     }
     
     public Collection getAllKeys (String category)
     {
        synchronized(this.categories)
        {
           HashMap cat = (HashMap)categories.get (category);
           if (cat == null) return null;
           
           return cat.keySet ();
        }
     }
     
     public Collection getAllValues (String category)
     {
        synchronized(this.categories)
        {
           HashMap cat = (HashMap)categories.get (category);
           if (cat == null) return null;
           
           return cat.values ();
        }
     }
     
     
     public void registerDSListener (String category, DistributedState.DSListener 
subscriber)
     {
        synchronized(this.keyListeners)
        {
           ArrayList listeners = (ArrayList)keyListeners.get (category);
           if (listeners == null)
           {
              listeners = new ArrayList ();
              keyListeners.put (category, listeners);
           }
           listeners.add (subscriber);
        }
     }
     
     public void unregisterDSListener (String category, DistributedState.DSListener 
subscriber)
     {
        synchronized(this.keyListeners)
        {
           ArrayList listeners = (ArrayList)keyListeners.get (category);
           if (listeners == null) return;
           
           listeners.remove (subscriber);
           if (listeners.size () == 0)
           {
              keyListeners.remove (category);
           }
        }
     }
     
     protected void notifyKeyListeners (String category, String key, Serializable 
value)
     {
        synchronized(this.keyListeners)
        {
           ArrayList listeners = (ArrayList)keyListeners.get (category);
           if (listeners == null) return;
           
           for (int i = 0; i < listeners.size (); i++)
           {
              DistributedState.DSListener listener = 
(DistributedState.DSListener)listeners.get (i);
              listener.valueHasChanged (category, key, value);
           }
        }
     }
     
     protected void notifyKeyListenersOfRemove (String category, String key, 
Serializable oldContent)
     {
        synchronized(this.keyListeners)
        {
           ArrayList listeners = (ArrayList)keyListeners.get (category);
           if (listeners == null) return;
           
           for (int i = 0; i < listeners.size (); i++)
           {
              DistributedState.DSListener listener = 
(DistributedState.DSListener)listeners.get (i);
              listener.keyHasBeenRemoved (category, key, oldContent);
           }
        }
     }
     
     protected void cleanupKeyListeners ()
     {
        // NOT IMPLEMENTED YET
     }
  }
  
  
  1.1                  
jbossmx/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java
  
  Index: HAPartitionImpl.java
  ===================================================================
  /*
   * JBoss, the OpenSource J2EE WebOS
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  
  package org.jboss.ha.framework.server;
  
  
  import JavaGroups.MethodLookup;
  import JavaGroups.MethodLookupClos;
  import JavaGroups.MethodLookupJava;
  import JavaGroups.MembershipListener;
  import JavaGroups.MessageListener;
  import JavaGroups.JChannel;
  import JavaGroups.Channel;
  import JavaGroups.Message;
  import JavaGroups.MethodCall;
  import JavaGroups.RpcDispatcher;
  import JavaGroups.RspList;
  import JavaGroups.Rsp;
  import JavaGroups.GroupRequest;
  import JavaGroups.View;
  import JavaGroups.Address;
  import JavaGroups.Util;
  
  import java.util.Vector;
  import java.util.ArrayList;
  import java.util.HashMap;
  import java.util.Iterator;
  
  import java.io.Serializable;
  
  import javax.naming.InitialContext;
  import javax.naming.Reference;
  import javax.naming.StringRefAddr;
  import javax.naming.Name;
  import javax.naming.Context;
  import javax.naming.NamingException;
  import javax.naming.NameNotFoundException;
  
  import org.jboss.naming.NonSerializableFactory;
  
  import java.lang.reflect.Method;
  
  import org.jboss.logging.Logger;
  
  import org.jboss.ha.framework.interfaces.HAPartition;
  import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
  import org.jboss.ha.framework.interfaces.DistributedState;
  
  
  public class HAPartitionImpl extends RpcDispatcher implements HAPartition, 
MembershipListener, MessageListener
  {
     protected HashMap rpcHandlers = new HashMap ();
     protected HashMap stateHandlers = new HashMap ();
     protected ArrayList listeners = new ArrayList ();
     protected Vector members = null;
     
     protected String partitionName;
     protected String nodeName;
     protected int timeout = 0;
     
     protected JChannel channel = null;
     
     protected DistributedReplicantManagerImpl replicantManager = null;
     protected DistributedStateImpl dsManager = null;
     
     protected Logger log = null;
     
     final MethodLookup  method_lookup_clos=new MethodLookupClos ();
     
     public HAPartitionImpl (String partitionName, JChannel channel, boolean 
deadlock_detection) throws Exception
     {
        super(channel, null, null, new Object (), false); // init RpcDispatcher with a 
fake target object
        this.log = Logger.create ("HAPartition:" + partitionName);
        this.channel = channel;
        this.partitionName = partitionName;
     }
     
     public void init () throws Exception
     {
        // Subscribe to dHA events comming generated by the JavaGroups protocol stack
        //
        log.debug ("SetMembershipListener");
        SetMembershipListener (this);
        log.debug ("SetMessageListener");
        SetMessageListener (this);
        
        // Create the DRM and link it to this HAPartition
        //
        log.debug ("create replicant manager");
        this.replicantManager = new DistributedReplicantManagerImpl (this);
        log.debug ("init replicant manager");
        this.replicantManager.init ();
        log.debug ("bind replicant manager");
        
        // Create the DS and link it to this HAPartition
        //
        log.debug ("create distributed state");
        this.dsManager = new DistributedStateImpl (this);
        log.debug ("init distributed state service");
        this.dsManager.init ();
        log.debug ("bind distributed state service");
  
        
        // Bind ourself in the public JNDI space
        //
        Context ctx = new InitialContext ();
        this.bind ("/HAPartition/" + partitionName, this, HAPartitionImpl.class, ctx);
        
        log.info ("done initing..");
     }
     
     protected void bind (String jndiName, Object who, Class classType, Context ctx) 
throws Exception
     {
        // Ah ! This service isn't serializable, so we use a helper class
        //
        NonSerializableFactory.bind (jndiName, who);
        Name n = ctx.getNameParser ("").parse (jndiName);
        while (n.size () > 1)
        {
           String ctxName = n.get (0);
           try
           {
              ctx = (Context)ctx.lookup (ctxName);
           }
           catch (NameNotFoundException e)
           {
              log.info ("creating Subcontext" + ctxName);
              ctx = ctx.createSubcontext (ctxName);
           }
           n = n.getSuffix (1);
        }
        
        // The helper class NonSerializableFactory uses address type nns, we go on to
        // use the helper class to bind the service object in JNDI
        //
        StringRefAddr addr = new StringRefAddr ("nns", jndiName);
        Reference ref = new Reference ( classType.getName (), addr, 
NonSerializableFactory.class.getName (), null);
        ctx.bind (n.get (0), ref);
     }
     
     public void start () throws Exception
     {
        // get current JG group properties
        //
        log.debug ("get nodeName");
        this.nodeName = channel.GetLocalAddress ().toString ();
        log.debug ("Get current members");
        View view = channel.GetView ();
        this.members = view.GetMembers ();
        log.info ("Num cluster members: " + members.size ());
        
        // We must now syncrhonize new state transfer subscriber
        //
        boolean rc = channel.GetState (null, 8000);
        if (rc)
           log.info ("State was retrieved successfully");
        else
           log.info ("State could not be retrieved, (must be first member of group)");
        
        // We start now able to start our DRM and DS
        //
        this.replicantManager.start ();
        this.dsManager.start ();
     }
     
     public void close () throws Exception
     {
        // Stop the DRM and DS services
        //
        try {this.replicantManager.stop ();} catch (Exception printed) 
{printed.printStackTrace ();}
        try {this.dsManager.stop ();} catch (Exception printed) 
{printed.printStackTrace ();}
        
        try {channel.Close ();} catch (Exception printed) {printed.printStackTrace ();}
        
        new InitialContext ().unbind ("/HAPartition/" + partitionName);
     }
     
     public String getNodeName ()
     {
        return nodeName;
     }
     
     public String getPartitionName ()
     {
        return partitionName;
     }
     
     public DistributedReplicantManager getDistributedReplicantManager ()
     {
        return replicantManager;
     }
     
     public DistributedState getDistributedStateService ()
     {
        return this.dsManager;
     }
     
     // ***************************
     // ***************************
     // RPC multicast communication
     // ***************************
     // ***************************
     //
     public void registerRPCHandler (String objName, Object subscriber)
     {
        rpcHandlers.put (objName, subscriber);
     }
     
     public void unregisterRPCHandler (String objName, Object subscriber)
     {
        rpcHandlers.remove (objName);
     }
     
     
     /**
      * This function is an abstraction of RpcDispatcher.
      */
     public ArrayList callMethodOnCluster (String objName, String methodName, Object[] 
args, boolean excludeSelf) throws Exception
     {
        ArrayList rtn = new ArrayList ();
        MethodCall m = new MethodCall (objName + "." + methodName, args);
        RspList rsp = this.CallRemoteMethods (null, m, GroupRequest.GET_ALL, timeout);
        if (rsp != null)
        {
           for (int i = 0; i < rsp.size (); i++)
           {
              Object item = rsp.elementAt (i);
              if (item instanceof Rsp)
              {
                 item = ((Rsp)item).GetValue ();
              }
              rtn.add (item);
           }
        }
        
        if (!excludeSelf)
        {
           m.SetName (methodName);
           Object handler = rpcHandlers.get (objName);
           Object retval=m.Invoke (handler, method_lookup_clos);
           if (retval != null) rtn.add (retval);
        }
        return rtn;
     }
     
     // *************************
     // *************************
     // State transfer management
     // *************************
     // *************************
     //
     
     
     public void subscribeToStateTransferEvents (String objectName, 
HAPartition.HAPartitionStateTransfer subscriber)
     {
        stateHandlers.put (objectName, subscriber);
     }
     
     public void unsubscribeFromStateTransferEvents (String objectName, 
HAPartition.HAPartitionStateTransfer subscriber)
     {
        stateHandlers.remove (objectName);
     }
     
     
     // MessageListener methods
     //
     public Object GetState ()
     {
        log.info ("GetState called.");
        try
        {
           // we now get the sub-state of each HAPartitionStateTransfer subscribers and
           // build a "macro" state
           //
           HashMap state = new HashMap ();
           Iterator keys = stateHandlers.keySet ().iterator ();
           while (keys.hasNext ())
           {
              String key = (String)keys.next ();
              HAPartition.HAPartitionStateTransfer subscriber = 
(HAPartition.HAPartitionStateTransfer)stateHandlers.get (key);
              log.debug ("GetState for " + key);
              state.put (key, subscriber.getCurrentState ());
           }
           return state;
        }
        catch (Exception ex)
        {
           log.error ("GetState failed", ex);
        }
        return null;
     }
     
     public void SetState (Object obj)
     {
        try
        {
           log.info ("SetState called");
           if (obj == null)
           {
              log.info ("state is null");
              return;
           }
           
           HashMap state = (HashMap)obj;
           Iterator keys = state.keySet ().iterator ();
           while (keys.hasNext ())
           {
              String key = (String)keys.next ();
              log.info ("SetState for " + key);
              Object someState = state.get (key);
              HAPartition.HAPartitionStateTransfer subscriber = 
(HAPartition.HAPartitionStateTransfer)stateHandlers.get (key);
              if (subscriber != null)
              {
                 subscriber.setCurrentState ((Serializable)someState);
              }
              else
              {
                 log.info ("There is no stateHandler for: " + key);
              }
           }
        }
        catch (Exception ex)
        {
           log.error ("SetState failed", ex);
        }
     }
     
     public void Receive (Message msg)
     { /* complete */}
     
     // *************************
     // *************************
     // Group Membership listeners
     // *************************
     // *************************
     //
     
     public void registerMembershipListener (HAPartition.HAMembershipListener listener)
     {
        synchronized(this.listeners)
        {
           this.listeners.add (listener);
        }
     }
     
     public void unregisterMembershipListener (HAPartition.HAMembershipListener 
listener)
     {
        synchronized(this.listeners)
        {
           this.listeners.remove (listener);
        }
     }
     
     // MembershipListener required methods
     
     public void Suspect (Address suspected_mbr) { log.info ("Suspected member: " + 
suspected_mbr); /* complete */  }
     
     public void Block () {}
     
     public void ViewAccepted (View newView)
     {
        try
        {
           if (this.members == null)
           {
              // Initial viewAccepted
              //
              this.members = newView.GetMembers ();
              return;
           }
           
           Vector oldMembers = this.members;
           Vector allMembers = newView.GetMembers ();
           Vector deadMembers = getDeadMembers (oldMembers, allMembers);
           Vector newMembers = getNewMembers (oldMembers, allMembers);
           this.members = allMembers;
           
           log.info ("membership changed from " + oldMembers.size () + " to " + 
allMembers.size ());
           
           // Broadcast the new view to the view change listeners
           //
           synchronized(this.listeners)
           {
              for (int i = 0; i < listeners.size (); i++)
              {
                 try
                 {
                 ((HAPartition.HAMembershipListener)listeners.get 
(i)).membershipChanged (deadMembers, newMembers, allMembers);
                 }
                 catch (Exception printed)
                 {
                    // a problem in a listener should not prevent other members to 
receive the new view
                    //
                    printed.printStackTrace ();
                 }
              }
           }
        }
        catch (Exception ex)
        {
           log.error ("ViewAccepted failed", ex);
        }
     }
     
     protected Vector getDeadMembers (Vector oldMembers, Vector newMembers)
     {
        Vector dead = new Vector ();
        for (int i=0; i<oldMembers.size ();i++)
           if (!newMembers.contains (oldMembers.elementAt (i)))
              dead.add (oldMembers.elementAt (i));
        
        return dead;
     }
     
     protected Vector getNewMembers (Vector oldMembers, Vector allMembers)
     {
        Vector newMembers = new Vector ();
        for (int i=0; i<allMembers.size ();i++)
           if (!oldMembers.contains (allMembers.elementAt (i)))
              newMembers.add (allMembers.elementAt (i));
        return newMembers;
     }
     
     
     ///////////////////////////////////////////////////////////////
     
     /**
      * Message contains MethodCall. Execute it against *this* object and return 
result.
      * Use MethodCall.Invoke() to do this. Return result.
      *
      * This overrides RpcDispatcher.Handle so that we can dispatch to many different 
objects.
      */
     public Object Handle (Message req)
     {
        Object body = null;
        Object retval = null;
        MethodCall  method_call = null;
        
        if(req == null || req.GetBuffer () == null)
        {
           log.warn ("RpcProtocol.Handle(): message or message buffer is null !");
           return null;
        }
        
        try
        {
           body=Util.ObjectFromByteBuffer (req.GetBuffer ());
        }
        catch(Exception e)
        {
           log.warn ("RpcProtocol.Handle(): " + e);
           return null;
        }
        
        if(body == null || !(body instanceof MethodCall))
        {
           log.warn ("RpcProtocol.Handle(): message does not contain a MethodCall 
object !");
           return null;
        }
        
        // get method call informations
        //
        method_call=(MethodCall)body;
        String methodName = method_call.GetName ();      
        int idx = methodName.indexOf ('.');
        String handlerName = methodName.substring (0, idx);
        String newMethodName = methodName.substring (idx + 1);
        log.info ("Handle: " + methodName);
        
        // prepare method call
        //
        method_call.SetName (newMethodName);
        Object handler = rpcHandlers.get (handlerName);
        
        // Invoke it
        //
        retval=method_call.Invoke (handler, method_lookup_clos);
        
        return retval;
     }
  }
  
  
  1.1                  
jbossmx/src/main/org/jboss/ha/framework/server/HARMIServerImpl.java
  
  Index: HARMIServerImpl.java
  ===================================================================
  package org.jboss.ha.framework.server;
  
  import java.util.Vector;
  import java.util.ArrayList;
  import java.util.HashMap;
  import java.util.Hashtable;
  import java.util.Iterator;
  import java.util.Collection;
  import java.rmi.Remote;
  import java.rmi.server.RemoteStub;
  import java.rmi.server.RemoteServer;
  import java.rmi.server.UnicastRemoteObject;
  import java.io.Serializable;
  import java.io.Externalizable;
  import java.io.ObjectInput;
  import java.io.ObjectOutput;
  import java.io.IOException;
  import java.rmi.RemoteException;
  
  import org.jboss.logging.Logger;
  import org.jboss.ejb.plugins.jrmp.interfaces.RemoteMethodInvocation;
  import java.rmi.MarshalledObject;
  import java.lang.reflect.Method;
  import java.lang.reflect.InvocationTargetException;
  import java.lang.reflect.Proxy;
  
  import org.jboss.ha.framework.interfaces.HAPartition;
  import org.jboss.ha.framework.interfaces.HARMIServer;
  import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
  import org.jboss.ha.framework.interfaces.LoadBalancePolicy;
  import org.jboss.ha.framework.interfaces.HARMIResponse;
  import org.jboss.ha.framework.interfaces.HARMIClient;
  import org.jboss.ha.framework.interfaces.HARMIProxy;
  
  /**
   *
   */
  public class HARMIServerImpl
  implements HARMIServer,
  DistributedReplicantManager.ReplicantListener
  {
     protected String replicantName;
     protected long lastSet = System.currentTimeMillis ();
     protected ArrayList replicants = new ArrayList ();
     protected Object handler;
     protected HashMap invokerMap = new HashMap ();
     protected HAPartition partition = null;
     protected Logger log;
     protected RemoteStub rmistub;
     protected Object stub;
     protected String key;
     protected Class intf;
     
     public HARMIServerImpl (HAPartition partition, String replicantName, Class intf, 
Object handler) throws Exception
     {
        this.replicantName = replicantName;
        this.partition = partition;
        this.handler = handler;
        this.log = Logger.create (this.getClass ());
        this.intf = intf;
        Method[] methods = handler.getClass ().getMethods ();
        
        for (int i = 0; i < methods.length; i++)
           invokerMap.put (new Long (RemoteMethodInvocation.calculateHash 
(methods[i])), methods[i]);
        
        this.rmistub = UnicastRemoteObject.exportObject (this);
        this.key = partition.getPartitionName () + "/" + replicantName;
        partition.getDistributedReplicantManager ().registerListener (replicantName, 
this);
        partition.getDistributedReplicantManager ().add (replicantName, rmistub);
        HARMIServer.rmiServers.put (key, this);
     }
     
     public ArrayList getReplicants () throws Exception
     {
        return replicants;
     }
     
     public Object createHAStub (LoadBalancePolicy policy)
     {
        HARMIClient client = new HARMIClient (replicants, policy, key, handler);
        return Proxy.newProxyInstance (
        intf.getClassLoader (),
        new Class[]
        { intf, HARMIProxy.class },
        client);
     }
     
     public void destroy ()
     {
        try
        {
           this.partition.getDistributedReplicantManager ().unregisterListener 
(this.replicantName, this);
           this.partition.getDistributedReplicantManager ().remove 
(this.replicantName);
           
           HARMIServer.rmiServers.remove (key);
           UnicastRemoteObject.unexportObject (this, true);
           
        } catch (Exception e)
        {
           e.printStackTrace ();
        }
     }
     
     public long getTag ()
     {
        return lastSet;
     }
     
     public Object getLocal () throws Exception
     {
        return handler;
     }
     
     public void replicantsChanged (String key, ArrayList newReplicants)
     {
        log.info ("replicantsChanged" + key + " to " + newReplicants.size ());
        synchronized(replicants)
        {
           // client has reference to replicants so it will automatically get
           // updated
           replicants.clear ();
           replicants.addAll (newReplicants);
        }
        lastSet = System.currentTimeMillis ();
     }
     
     public HARMIResponse invoke (long tag, MarshalledObject mimo) throws Exception
     {
        RemoteMethodInvocation rmi = (RemoteMethodInvocation)mimo.get ();
        rmi.setMethodMap (invokerMap);
        Method method = rmi.getMethod ();
        
        try
        {
           HARMIResponse rsp = new HARMIResponse ();
           if (tag < lastSet)
           {
              rsp.newReplicants = new ArrayList (replicants);
              rsp.tag = lastSet;
           }
           
           rsp.response = method.invoke (handler, rmi.getArguments ());
           return rsp;
        }
        catch (IllegalAccessException iae)
        {
           throw iae;
        }
        catch (IllegalArgumentException iae)
        {
           throw iae;
        }
        catch (InvocationTargetException ite)
        {
           throw (Exception)ite.getTargetException ();
        }
     }
     
  }
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to