User: patriot1burke
  Date: 01/09/13 20:53:29

  Added:       src/main/org/jboss/ha RoundRobin.java LoadBalancePolicy.java
                        HARMITarget.java HAPartitionImpl.java
                        HAPartition.java DistributedStateImpl.java
                        DistributedState.java
                        DistributedReplicantManagerImpl.java
                        DistributedReplicantManager.java
                        ClusterPartitionMBean.java ClusterPartition.java
  Log:
  clustering.  Work in Progress.   This code probably doesn't work, but it
  does compile
  
  Revision  Changes    Path
  1.1                  jbossmx/src/main/org/jboss/ha/RoundRobin.java
  
  Index: RoundRobin.java
  ===================================================================
  package org.jboss.ha;
  
  public class RoundRobin implements LoadBalancePolicy
  {
     protected transient int cursorRemote = 0;
  
     public Object chooseTarget(Object[] targets)
     {
        cursorRemote = ( (cursorRemote + 1) % targets.length );
        return targets[cursorRemote];
     }
  }
  
  
  
  1.1                  jbossmx/src/main/org/jboss/ha/LoadBalancePolicy.java
  
  Index: LoadBalancePolicy.java
  ===================================================================
  package org.jboss.ha;
  
  public interface LoadBalancePolicy extends java.io.Serializable
  {
     public Object chooseTarget(Object[] targets);
  }
  
  
  
  1.1                  jbossmx/src/main/org/jboss/ha/HARMITarget.java
  
  Index: HARMITarget.java
  ===================================================================
  /*
   * JBoss, the OpenSource J2EE WebOS
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jboss.ha;
  
  import java.util.Vector;
  import java.util.ArrayList;
  import java.rmi.ConnectException;
  import java.rmi.ConnectIOException;
  import java.rmi.NoSuchObjectException;
  import java.rmi.UnknownHostException;
  import java.rmi.RemoteException;
  import java.lang.reflect.Method;
  import java.lang.reflect.InvocationTargetException;
  
  
  /**
   *   
   *
   *   @author <a href="mailto:[EMAIL PROTECTED]";>Sacha Labourey</a>
   *   @author <a href="mailto:[EMAIL PROTECTED]";>Bill Burke</a>
   *   @version $Revision: 1.1 $
   *
   *   <p><b>Revisions:</b>
   *
   *   <p><b>20010831 Bill Burke:</b>
   *   <ul>
   *   <li> First import of sources
   *   </ul>
   */
  
  public class HARMITarget implements java.io.Serializable, 
java.lang.reflect.InvocationHandler
  {
     
     protected volatile long DELAY_FOR_RESYNCH_AFTER_DEAD_TARGET = 5000; // 5 seconds
     protected volatile long DELAY_FOR_RESYNCH_FOR_NORMAL_UPDATE = 30000; // 30 seconds
     protected transient volatile long nextResynch;
     protected transient volatile boolean updatingData = false;
     
     protected ArrayList managers = new ArrayList(0);
     protected Object[] targets = new Object[0];
     protected String targetName;
     protected LoadBalancePolicy loadBalancePolicy;
  
     public HARMITarget(String targetName, 
                        long normalResynch, 
                        long deathResynch, 
                        ArrayList managers,
                        Object[] targets,
                        LoadBalancePolicy policy)
     {
        this.targetName = targetName;
        DELAY_FOR_RESYNCH_AFTER_DEAD_TARGET = deathResynch;
        DELAY_FOR_RESYNCH_FOR_NORMAL_UPDATE = normalResynch;
        nextResynch = System.currentTimeMillis () + 
DELAY_FOR_RESYNCH_FOR_NORMAL_UPDATE;
        updatingData = false;
        this.managers = managers;
        this.targets = targets;
        this.loadBalancePolicy = policy;
     }
     
     public ArrayList getManagers()
     {
        return managers;
     }
     
     public void setManagers(ArrayList newManagers)
     {
        synchronized(managers)
        {
           managers = newManagers;
        }
     }
  
     public Object[] getTargets()
     {
        return targets;
     }
  
     public void setTargets(Object[] newTargets)
     {
        synchronized(targets)
        {
           targets = newTargets;
        }
     }
  
     public Object getRemoteTarget()
     {
        if (targets.length == 0)
        {
           findTargetsSynchronously ();
           if (targets.length == 0)
              return null;
        }
        synchronized (targets)
        {
           tick();
           return loadBalancePolicy.chooseTarget(targets);
        }
     }
     
     public void remoteTargetHasFailed(Object target)
     {
        removeDeadTarget(target);
     }
     
     protected void removeDeadTarget (Object target)
     {
        //System.out.println("Size before : " + Integer.toString(targets.length));
        if (targets != null)
        {
           synchronized (targets)
           {
              //System.out.println("removeDeadTarget has been called");
              int length = targets.length;
              for (int i=0; i<length; ++i)
              {
                 if (targets[i] == target)
                 {
                    Object[] copy = new Object[length - 1];
                    System.arraycopy(targets, 0, copy, 0, i);
                    if ( (i+1) < length)
                    {
                       System.arraycopy(targets, i+1, copy, i, length - i - 1);
                    }
                    targets = copy;
                    resynchAtLeastAt(System.currentTimeMillis () + 
DELAY_FOR_RESYNCH_AFTER_DEAD_TARGET);
                    tick();
                    return;
                 }
              }
           }
        }
        // nothing found
     }
     
     protected void tick()
     {
        if (!updatingData) // this is done in order to do not have to wait on 
asynchronous updates
        {
           boolean resynch = false;
           synchronized (this)
           {
              long now = System.currentTimeMillis ();
              resynch = (now >= nextResynch);
              if (resynch)
              {
                 //System.out.println("Tick has decided to resynch. Next auto resynh 
at " + Long.toString(now + DELAY_FOR_RESYNCH_FOR_NORMAL_UPDATE ) + " (now is " + 
Long.toString (now) + " and old update value was: " + Long.toString(nextResynch) + 
")");
                 nextResynch = ( now + DELAY_FOR_RESYNCH_FOR_NORMAL_UPDATE ) ;
              }
           }
           if (resynch)
              findTargetsAsynchronously ();
        }
     }
           
     
     protected void resynchAtLeastAt(long when)
     {
        // only keep more urgent resynch need
        //
        if ( (nextResynch == 0) || (when < nextResynch) )
           nextResynch = when;
     }
     
     protected void updateTargets()
     {
        //System.out.println("updateTargets has been called");
        for (int i=0; i < this.managers.size(); i++)
        {
           try
           {
              DistributedReplicantManager supplier = 
(DistributedReplicantManager)managers.get(i);
              ArrayList replicants = supplier.lookupReplicants(targetName);
              ArrayList newManagers = 
supplier.lookupReplicants("DistributedReplicantManager");
              resynchAtLeastAt(System.currentTimeMillis () + 
DELAY_FOR_RESYNCH_FOR_NORMAL_UPDATE);
              synchronized (targets)
              {
                 this.targets = replicants.toArray();
              }
              synchronized(managers)
              {
                 this.managers = newManagers;
              }
              return;
           }
           catch (Exception e)
           {/*e.printStackTrace ();*/} // during debug phase only!!!
        }
     }
  
  
     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
     {
        Object target = getRemoteTarget();
        while (target != null)
        {
           try
           {
              // Is this step actually necessary?  Can I just do method.invoke(target, 
args); ?
              Method m = target.getClass().getMethod(method.getName(), 
method.getParameterTypes());
              return m.invoke(target, args);
           }
           catch (IllegalAccessException iae)
           {
              throw iae;
           }
           catch (IllegalArgumentException iae)
           {
              throw iae;
           }
           catch (InvocationTargetException ite)
           {
              Throwable targetException = ite.getTargetException();
              if (targetException instanceof ConnectException
                  || targetException instanceof ConnectIOException
                  || targetException instanceof NoSuchObjectException
                  || targetException instanceof UnknownHostException)
              {
                 // ignore
              }
              else
              {
                 throw targetException;
              }
  
           }
           // If we reach here, this means that we must fail-over
           remoteTargetHasFailed(target);
           target = getRemoteTarget();
        }
        // if we get here this means list was exhausted
        throw new RemoteException("Service unavailable.");
        
     }
  
     protected void findTargetsSynchronously()
     {
        // if this is called, it is because our target list is empty
        //
        //System.out.println("findTargetsSynchronously has been called");
        synchronized (this)
        {
           if (targets.length > 0)
              return; // the list has been probably refilled during our wait for the 
monitor
           
           updateTargets ();
        }
        
     }
     
     protected void findTargetsAsynchronously()
     {
        //System.out.println("findTargetsAsynchronously has been called");
        updatingData = true;
        Thread t = new Thread ( new Runnable () 
           {
              public void run ()
              {
                 try
                 {
                    synchronized(this)
                    {
                       updateTargets ();
                    }
                 } finally
                 {
                    updatingData = false;
                 }
              }
           });
        t.start();
     }
  }
  
  
  
  
  1.1                  jbossmx/src/main/org/jboss/ha/HAPartitionImpl.java
  
  Index: HAPartitionImpl.java
  ===================================================================
  /*
   * JBoss, the OpenSource J2EE WebOS
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  
  package org.jboss.ha;
  
  import JavaGroups.MethodLookup;
  import JavaGroups.MethodLookupClos;
  import JavaGroups.MembershipListener;
  import JavaGroups.MessageListener;
  import JavaGroups.JChannel;
  import JavaGroups.Channel;
  import JavaGroups.Message;
  import JavaGroups.MethodCall;
  import JavaGroups.RpcDispatcher;
  import JavaGroups.RspList;
  import JavaGroups.GroupRequest;
  import JavaGroups.View;
  import JavaGroups.Address;
  import JavaGroups.Util;
  import java.util.Vector;
  import java.util.ArrayList;
  import java.util.HashMap;
  import java.util.Iterator;
  import java.io.Serializable;
  import javax.naming.InitialContext;
  
  public class HAPartitionImpl extends RpcDispatcher implements HAPartition, 
MembershipListener, MessageListener
  {
     protected HashMap rpcHandlers = new HashMap();
     protected HashMap stateHandlers = new HashMap();
     protected ArrayList listeners = new ArrayList();
     protected String partitionName;
     protected Vector members = null;
     protected int timeout = 500;
     protected JChannel channel = null;
     protected DistributedReplicantManagerImpl replicantManager;
     protected String nodeName;
     final MethodLookup  method_lookup_clos=new MethodLookupClos();
  
     public HAPartitionImpl(String partitionName, JChannel channel,
                            boolean deadlock_detection) throws Exception
     {
        super(channel, null, null, null, deadlock_detection);
        this.channel = channel;
        this.partitionName = partitionName;
        this.nodeName = channel.GetLocalAddress().toString();
     }
  
     public void init() throws Exception
     {
        SetMembershipListener(this);
        SetMessageListener(this);
        this.replicantManager = new DistributedReplicantManagerImpl(this);
        this.replicantManager.init();
        new InitialContext().bind("/HAPartition/" + partitionName, this);
     }
  
     public void close() throws Exception
     {
        channel.Close();
        new InitialContext().unbind("/HAPartition/" + partitionName);
     }
  
     public String getNodeName()
     {
        return nodeName;
     }
  
     public String getPartitionName()
     {
        return partitionName;
     }
  
     public DistributedReplicantManager getDistributedReplicantManager()
     {
        return replicantManager;
     }
  
  
     // ***************************
     // ***************************
     // RPC multicast communication
     // ***************************
     // ***************************
     //
     public void registerRPCHandler(String objName, Object subscriber)
     {
        rpcHandlers.put(objName, subscriber);
     }
  
     public void unregisterRPCHandler(String objName, Object subscriber)
     {
        rpcHandlers.remove(objName);
     }
  
  
     /**
      * This function is an abstraction of RpcDispatcher.
      */
     public ArrayList callMethodOnCluster(String objName, String methodName, Object[] 
args, boolean excludeSelf) throws Exception
     {
        MethodCall m = new MethodCall(objName + "." + methodName, args);
        RspList rsp = this.CallRemoteMethods(null, m, GroupRequest.GET_ALL, timeout);
        ArrayList rtn = new ArrayList(rsp.size());
        for (int i = 0; i < rsp.size(); i++)
           rtn.add(rsp.elementAt(i));
        
        if (!excludeSelf)
        {
           Object handler = rpcHandlers.get(objName);
           Object retval=m.Invoke(handler, method_lookup_clos);
           rtn.add(retval);
        }
        return rtn;
     }
  
     // *************************
     // *************************
     // State transfer management
     // *************************
     // *************************
     //
  
  
     public void subscribeToStateTransferEvents(String objectName, 
HAPartition.HAPartitionStateTransfer subscriber)
     {
        stateHandlers.put(objectName, subscriber);
        // We must now syncrhonize new state transfer subscriber 
        boolean rc = channel.GetState(null, 8000);
        /* 
           if (rc) System.out.println("State was retrieved successfully");
           else System.out.println("State could not be retrieved, (must be first 
member of group)");
        */
     }
  
     public void unsubscribeFromStateTransferEvents(String objectName, 
HAPartition.HAPartitionStateTransfer subscriber)
     {
        stateHandlers.remove(objectName);
     }
  
  
     // MessageLister methods
     public Object GetState()
     {
        HashMap state = new HashMap();
        Iterator keys = stateHandlers.keySet().iterator();
        while (keys.hasNext())
        {
           String key = (String)keys.next();
           HAPartition.HAPartitionStateTransfer subscriber = 
(HAPartition.HAPartitionStateTransfer)stateHandlers.get(key);
           state.put(key, subscriber.getCurrentState());
        }
        return state;
     }
  
     public void SetState(Object obj)
     {
        HashMap state = (HashMap)obj;
        Iterator keys = state.keySet().iterator();
        while (keys.hasNext())
        {
           String key = (String)keys.next();
           Object someState = state.get(key);
           HAPartition.HAPartitionStateTransfer subscriber = 
(HAPartition.HAPartitionStateTransfer)stateHandlers.get(key);
           subscriber.setCurrentState((Serializable)someState);
        }
     }
  
     public void Receive(Message msg) { /* complete */}
  
     // *************************
     // *************************
     // Group Membership listeners
     // *************************
     // *************************
     //
  
     public void registerMembershipListener(HAPartition.HAMembershipListener listener)
     {
        synchronized(this.listeners)
        {
           this.listeners.add(listener);
        }
     }
  
     public void unregisterMembershipListener(HAPartition.HAMembershipListener 
listener)
     {
        synchronized(this.listeners)
        {
           int idx = listeners.indexOf(listener);
           if (idx != -1)
              listeners.remove(idx);
        }
     }
  
     // MembershipListener required methods
  
     public void Suspect(Address suspected_mbr)
     {
        // complete
     }
  
     public void Block() {}
  
     public void ViewAccepted(View newView)
     {
        Vector oldMembers = this.members;
        Vector allMembers = newView.GetMembers();
        Vector deadMembers = getDeadMembers(oldMembers, allMembers);
        Vector newMembers = getNewMembers(oldMembers, allMembers);
        this.members = allMembers;
        synchronized(this.listeners)
        {
           for (int i = 0; i < listeners.size(); i++)
           {
              
((HAPartition.HAMembershipListener)listeners.get(i)).membershipChanged(deadMembers, 
newMembers, allMembers);
           }
        }
     }
  
     protected Vector getDeadMembers (Vector oldMembers, Vector newMembers)
     {
        Vector dead = new Vector ();
        for (int i=0; i<oldMembers.size ();i++)
           if (!newMembers.contains (oldMembers.elementAt(i)))
              dead.add (oldMembers.elementAt (i));
        
        return dead;
     }
  
     protected Vector getNewMembers (Vector oldMembers, Vector allMembers)
     {
        Vector newMembers = new Vector ();
        for (int i=0; i<allMembers.size ();i++)
           if (!oldMembers.contains(allMembers.elementAt(i)))
              newMembers.add (allMembers.elementAt(i));
        return newMembers;
     }
  
  
     ///////////////////////////////////////////////////////////////
     
     /**
        Message contains MethodCall. Execute it against *this* object and return 
result.
        Use MethodCall.Invoke() to do this. Return result.
  
        This overrides RpcDispatcher.Handle so that we can dispatch to many different 
objects.
     */
     public Object Handle(Message req) 
     {
        Object      body=null, retval=null;
        MethodCall  method_call;
  
  
        if(server_obj == null) {
           System.err.println("RpcDispatcher.Handle(): no method handler is registered 
! " +
                              "Discarding request.");
           return null;
        }
  
        if(req == null || req.GetBuffer() == null) {
           System.err.println("RpcProtocol.Handle(): message or message buffer is null 
!");
           return null;
        }
  
        try {
           body=Util.ObjectFromByteBuffer(req.GetBuffer());
        }
        catch(Exception e) {
           System.err.println("RpcProtocol.Handle(): " + e);
           return null;
        }
  
        if(body == null || !(body instanceof MethodCall)) {
           System.err.println("RpcProtocol.Handle(): message does not contain a 
MethodCall object !");
           return null;
        }
  
        method_call=(MethodCall)body;
        String methodName = method_call.GetName();
        int idx = methodName.indexOf('.');
        String handlerName = methodName.substring(0, idx);
        String newMethodName = methodName.substring(idx + 1);
        method_call.SetName(newMethodName);
        Object handler = rpcHandlers.get(handlerName);
        retval=method_call.Invoke(handler, method_lookup_clos);
        return retval;
     }
     
  
  }
  
  
  
  1.1                  jbossmx/src/main/org/jboss/ha/HAPartition.java
  
  Index: HAPartition.java
  ===================================================================
  /*
   * JBoss, the OpenSource J2EE WebOS
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  
  package org.jboss.ha;
  
  import java.util.ArrayList;
  import java.util.Vector;
  import java.io.Serializable;
  
  public interface HAPartition
  {
     // General methods
     //
     public String getNodeName ();
     public String getPartitionName();
  
     public DistributedReplicantManager getDistributedReplicantManager();
     
     // ***************************
     // ***************************
     // RPC multicast communication
     // ***************************
     // ***************************
     //
     public void registerRPCHandler(String objectName, Object handler);
     public void unregisterRPCHandler(String objectName, Object subscriber);
  
     // Called only on all members of this partition on all nodes
     // (not subpartitions or other partitions)
     //
     public ArrayList callMethodOnCluster(String objectName, String methodName, 
Object[] args, boolean excludeSelf) throws Exception;
     
     // *************************
     // *************************
     // State transfer management
     // *************************
     // *************************
     //
     
     public interface HAPartitionStateTransfer
     {
        public Serializable getCurrentState ();
        public void setCurrentState(Serializable newState);
     }
     
     public void subscribeToStateTransferEvents (String objectName, 
HAPartition.HAPartitionStateTransfer subscriber);
     public void unsubscribeFromStateTransferEvents (String objectName, 
HAPartition.HAPartitionStateTransfer subscriber);
  
     // *************************
     // *************************
     // Group Membership listeners
     // *************************
     // *************************
     //
     public interface HAMembershipListener
     {
        public void membershipChanged(Vector deadMembers, Vector newMembers, Vector 
allMembers);
     }
  
     public void registerMembershipListener(HAMembershipListener listener);
     public void unregisterMembershipListener(HAMembershipListener listener);
  
  }
  
  
  
  1.1                  jbossmx/src/main/org/jboss/ha/DistributedStateImpl.java
  
  Index: DistributedStateImpl.java
  ===================================================================
  package org.jboss.ha;
  
  import java.util.Vector;
  import java.util.ArrayList;
  import java.util.HashMap;
  import java.util.Iterator;
  import java.util.Collection;
  import java.rmi.server.RemoteServer;
  import java.rmi.server.UnicastRemoteObject;
  import java.io.Serializable;
  
  public class DistributedStateImpl extends RemoteServer
     implements DistributedState,
                HAPartition.HAPartitionStateTransfer,
                HAPartition.HAMembershipListener
  {
     protected HashMap categories = new HashMap();
     protected HashMap keyListeners = new HashMap();
     protected HAPartition partition;
  
     protected final static String SERVICE_NAME = "DistributedState";
  
     public DistributedStateImpl(HAPartition partition)
     {
        this.partition = partition;
     }
  
     public void init() throws Exception
     {
        // When we subscribe to state transfer events, GetState will be called to 
initialize
        // this service.
        partition.subscribeToStateTransferEvents(SERVICE_NAME, this);
        partition.registerMembershipListener(this);
        partition.registerRPCHandler(SERVICE_NAME, this);
  
        // Export myself as an RMI server
        UnicastRemoteObject.exportObject(this, 0);
  
        // should bind myself as a replicant service? TODO
     }
  
     /////////////////////
     // State transfer API
     /////////////////////
     public Serializable getCurrentState()
     {
        return categories;
     }
  
     public void setCurrentState(Serializable newState)
     {
        synchronized (this.categories)
        {
           categories.clear();
           categories.putAll((HashMap)newState);
           if (keyListeners.size() > 0)
           {
              cleanupKeyListeners();
           }
        }
     }
  
     ////////////////////
     // Group membership API
     ////////////////////
     public void membershipChanged(Vector deadMembers, Vector newMembers, Vector 
allMembers)
     {
        // If membership changes, this has no effect on the distributed state
        //
     }
  
     ///////////////
     // DistributedReplicantManager API
     ///////////////
  
     public void _set(String category, String key, Serializable value) throws Exception
     {
        synchronized(this.categories)
        {
           HashMap cat = (HashMap)categories.get(category);
           if (cat == null)
           {
                cat = new HashMap();
                categories.put (category, cat);
           }
           cat.put(key, value);
           notifyKeyListeners(category, key, value);
        }
     }
  
     public void set(String category, String key, Serializable value) throws Exception
     {
        Object[] args = {category, key, value};
        partition.callMethodOnCluster(SERVICE_NAME, "_set", args, false);
     }
  
     public void _remove(String category, String key) throws Exception
     {
        synchronized(this.categories)
        {
           HashMap cat = (HashMap)categories.get(category);
           if (cat == null) return;
           Object removed = cat.remove(key);
           if (removed != null)
           {
              notifyKeyListenersOfRemove(category, key, (Serializable)removed);
              if (cat.size() == 0)
              {
                 categories.remove(category);
              }
           }
        }
     }
  
  
     public void remove(String category, String key) throws Exception
     {
        Object[] args = {category, key};
        partition.callMethodOnCluster(SERVICE_NAME, "_remove", args, false);
  
     }
  
     public Serializable get(String category, String key)
     {
        synchronized(this.categories)
        {
           HashMap cat = (HashMap)categories.get(category);
           if (cat == null) return null;
  
           return (Serializable)cat.get(key);
        }
     }
  
     public Collection getAllCategories()
     {
        synchronized(this.categories)
        {
           return categories.keySet();
        }
     }
  
     public Collection getAllKeys(String category)
     {
        synchronized(this.categories)
        {
           HashMap cat = (HashMap)categories.get(category);
           if (cat == null) return null;
  
           return cat.keySet();
        }
     }
  
     public Collection getAllValues(String category)
     {
        synchronized(this.categories)
        {
           HashMap cat = (HashMap)categories.get(category);
           if (cat == null) return null;
  
           return cat.values();
        }
     }
  
  
     public void registerDSListener(String category, DistributedState.DSListener 
subscriber)
     {
        synchronized(this.keyListeners)
        {
           ArrayList listeners = (ArrayList)keyListeners.get(category);
           if (listeners == null)
           {
                listeners = new ArrayList();
                keyListeners.put (category, listeners);
           }
           listeners.add(subscriber);
        }
     }
  
     public void unregisterDSListener(String category, DistributedState.DSListener 
subscriber)
     {
        synchronized(this.keyListeners)
        {
           ArrayList listeners = (ArrayList)keyListeners.get(category);
           if (listeners == null) return;
           int idx = listeners.indexOf(subscriber);
           if (idx != -1)
           {
              listeners.remove(idx);
              if (listeners.size() == 0)
              {
                 keyListeners.remove(category);
              }
           }
        }
     }
  
     protected void notifyKeyListeners(String category, String key, Serializable value)
     {
        synchronized(this.keyListeners)
        {
           ArrayList listeners = (ArrayList)keyListeners.get(category);
           if (listeners == null) return;
  
           for (int i = 0; i < listeners.size(); i++)
           {
              DistributedState.DSListener listener = 
(DistributedState.DSListener)listeners.get(i);
              listener.valueHasChanged(category, key, value);
           }
        }
     }
  
     protected void notifyKeyListenersOfRemove(String category, String key, 
Serializable oldContent)
     {
        synchronized(this.keyListeners)
        {
           ArrayList listeners = (ArrayList)keyListeners.get(category);
           if (listeners == null) return;
  
           for (int i = 0; i < listeners.size(); i++)
           {
              DistributedState.DSListener listener = 
(DistributedState.DSListener)listeners.get(i);
              listener.keyHasBeenRemoved (category, key, oldContent);
           }
        }
     }
  
  
     protected void cleanupKeyListeners()
     {
        // NOT IMPLEMENTED YET
     }
  
  }
  
  
  
  
  
  1.1                  jbossmx/src/main/org/jboss/ha/DistributedState.java
  
  Index: DistributedState.java
  ===================================================================
  package org.jboss.ha;
  
  import java.util.Collection;
  import java.io.Serializable;
  import java.rmi.Remote;
  
  public interface DistributedState extends Remote
  {
     /**
      * When a particular key in the DistributedState table gets modified, all 
listeners
      * will be notified of replicant changes for that key. Keys are organized in 
categories.
      */
     public interface DSListener
     {
        public void valueHasChanged(String category, String key, Serializable value);
        public void keyHasBeenRemoved (String category, String key, Serializable 
previousContent);
     }
  
     public void registerDSListener(String category, DSListener subscriber);
     public void unregisterDSListener(String category, DSListener subscriber);
  
     // State binding methods
     //
     /**
      * remove the entire key from the ReplicationService
      */
     public void remove(String category, String key) throws Exception;
  
     /**
      * Associates a value to a key in a specific category
      */
     public void set(String category, String key, Serializable value) throws Exception;
  
     /**
      * Lookup the replicant attached to this cluster node
      */
     public Serializable get(String category, String key);
  
     /**
      * Return a list of all categories.
      */
     public Collection getAllCategories();
  
     /**
      * Return a list of all keys in a category.
      */
     public Collection getAllKeys(String category);
  
     /**
      * Return a list of all values in a category.
      */
     public Collection getAllValues(String category);
  
     //
  
  }
  
  
  
  1.1                  
jbossmx/src/main/org/jboss/ha/DistributedReplicantManagerImpl.java
  
  Index: DistributedReplicantManagerImpl.java
  ===================================================================
  package org.jboss.ha;
  
  import java.util.Vector;
  import java.util.ArrayList;
  import java.util.HashMap;
  import java.util.Iterator;
  import java.util.Collection;
  import java.rmi.server.RemoteServer;
  import java.rmi.server.UnicastRemoteObject;
  import java.io.Serializable;
  
  public class DistributedReplicantManagerImpl extends RemoteServer
     implements DistributedReplicantManager,
                HAPartition.HAPartitionStateTransfer,
                HAPartition.HAMembershipListener
  {
     protected HashMap replicants = new HashMap();
     protected HashMap keyListeners = new HashMap();
     protected HAPartition partition;
  
     protected final static String SERVICE_NAME = "DistributedReplicantManager";
  
     public DistributedReplicantManagerImpl(HAPartition partition)
     {
        this.partition = partition;
     }
  
     public void init() throws Exception
     {
        // When we subscribe to state transfer events, GetState will be called to 
initialize
        // this service.
        partition.subscribeToStateTransferEvents(SERVICE_NAME, this);
        partition.registerMembershipListener(this);
        partition.registerRPCHandler(SERVICE_NAME, this);
  
        // Export myself as an RMI server
        UnicastRemoteObject.exportObject(this, 0);
  
        // Register myself as a replicant of DistributedReplicantManager
        add(SERVICE_NAME, this);
     }
  
     /////////////////////
     // State transfer API
     /////////////////////
     public Serializable getCurrentState()
     {
        return replicants;
     }
  
     public void setCurrentState(Serializable newState)
     {
        synchronized (this.replicants)
        {
           replicants.clear();
           replicants.putAll((HashMap)newState);
           if (keyListeners.size() > 0)
           {
              cleanupKeyListeners();
           }
        }
     }
  
     ////////////////////
     // Group membership API
     ////////////////////
     public void membershipChanged(Vector deadMembers, Vector newMembers, Vector 
allMembers)
     {
        // Here we only care about deadMembers.  Purge all replicant lists of 
deadMembers
        // and then notify all listening nodes.
  
        synchronized (this.replicants)
        {
           Iterator keys = replicants.keySet().iterator();
           while (keys.hasNext())
           {
              String key = (String)keys.next();
              HashMap replicant = (HashMap)replicants.get(key);
              boolean modified = false;
              for (int i = 0; i < deadMembers.size(); i++)
              {
                 String node = deadMembers.elementAt(i).toString();
                 Object removed = replicant.remove(node);
                 if (removed != null) modified = true;
              }
              if (modified)
              {
                 notifyKeyListeners(key, new ArrayList(replicant.values()));
              }
           }
        }
     }
  
     ///////////////
     // DistributedReplicantManager API
     ///////////////
  
     public void _add(String key, String nodeName, Serializable replicant)
     {
        synchronized(this.replicants)
        {
           HashMap rep = (HashMap)replicants.get(key);
           if (rep == null)
           {
                rep = new HashMap();
                replicants.put (key, rep);
           }
           rep.put(nodeName, replicant);
           notifyKeyListeners(key, new ArrayList(rep.values()));
        }
     }
  
     public void add(String key, Serializable replicant) throws Exception
     {
        Object[] args = {key, partition.getNodeName(), replicant};
        partition.callMethodOnCluster(SERVICE_NAME, "_add", args, false);
     }
  
     public void _remove(String key, String nodeName)
     {
        synchronized(this.replicants)
        {
           HashMap replicant = (HashMap)replicants.get(key);
           if (replicant == null) return;
           Object removed = replicant.remove(nodeName);
           if (removed != null)
           {
              Collection values = replicant.values();
              notifyKeyListeners(key, new ArrayList(values));
              if (values.size() == 0)
              {
                 replicants.remove(key);
              }
           }
        }
     }
  
     public void remove(String key) throws Exception
     {
        Object[] args = {key, partition.getNodeName()};
        partition.callMethodOnCluster(SERVICE_NAME, "_remove", args, false);
  
     }
  
     public Serializable lookupLocalReplicant(String key)
     {
        synchronized(this.replicants)
        {
           HashMap replicant = (HashMap)replicants.get(key);
           if (replicant == null) return null;
  
           return (Serializable)replicant.get(partition.getNodeName());
        }
     }
  
     public ArrayList lookupReplicants(String key)
     {
        synchronized(this.replicants)
        {
           HashMap replicant = (HashMap)replicants.get(key);
           if (replicant == null) return null;
  
           return new ArrayList(replicant.values());
        }
     }
  
  
     public void registerListener(String key, 
DistributedReplicantManager.ReplicantListener subscriber)
     {
        synchronized(this.keyListeners)
        {
           ArrayList listeners = (ArrayList)keyListeners.get(key);
           if (listeners == null)
           {
                listeners = new ArrayList();
                keyListeners.put (key, listeners);
           }
           listeners.add(subscriber);
        }
     }
  
     public void unregisterListener(String key, 
DistributedReplicantManager.ReplicantListener subscriber)
     {
        synchronized(this.keyListeners)
        {
           ArrayList listeners = (ArrayList)keyListeners.get(key);
           if (listeners == null) return;
           int idx = listeners.indexOf(subscriber);
           if (idx != -1)
           {
              listeners.remove(idx);
              if (listeners.size() == 0)
              {
                 keyListeners.remove(key);
              }
           }
        }
     }
  
     protected void notifyKeyListeners(String key, ArrayList newReplicants)
     {
        synchronized(this.keyListeners)
        {
           ArrayList listeners = (ArrayList)keyListeners.get(key);
           if (listeners == null) return;
  
           for (int i = 0; i < listeners.size(); i++)
           {
              DistributedReplicantManager.ReplicantListener listener = 
(DistributedReplicantManager.ReplicantListener)listeners.get(i);
              listener.replicantsChanged(key, newReplicants);
           }
        }
     }
  
     protected void cleanupKeyListeners()
     {
        // NOT IMPLEMENTED YET
     }
  
  }
  
  
  
  
  
  1.1                  jbossmx/src/main/org/jboss/ha/DistributedReplicantManager.java
  
  Index: DistributedReplicantManager.java
  ===================================================================
  package org.jboss.ha;
  
  import java.util.ArrayList;
  
  import java.io.Serializable;
  import java.rmi.Remote;
  
  public interface DistributedReplicantManager extends Remote
  {
     /**
      * When a particular key in the DistributedReplicantManager table gets modified, 
all listeners
      * will be notified of replicant changes for that key.
      */
     public interface ReplicantListener
     {
        public void replicantsChanged(String key, ArrayList newReplicants);
     }
  
     public void registerListener(String key, ReplicantListener subscriber);
     public void unregisterListener(String key, ReplicantListener subscriber);
  
     // State binding methods
     //
     /**
      * remove the entire key from the ReplicationService
      */
     public void remove(String key) throws Exception;
  
     /**
      * Add a replicant, it will be attached to this cluster node
      */
     public void add(String key, Serializable replicant) throws Exception;
  
     /**
      * Lookup the replicant attached to this cluster node
      */
     public Serializable lookupLocalReplicant(String key);
  
     /**
      * Return a list of all replicants.
      */
     public ArrayList lookupReplicants(String key);
  
  }
  
  
  
  1.1                  jbossmx/src/main/org/jboss/ha/ClusterPartitionMBean.java
  
  Index: ClusterPartitionMBean.java
  ===================================================================
  package org.jboss.ha;
  
  public interface ClusterPartitionMBean
     extends org.jboss.system.ServiceMBean
  {
     // Constants -----------------------------------------------------
     public static final String OBJECT_NAME = ":service=HANaming";
  
     public String getPartitionName();
     public void setPartitionName(String newName);
  
     public String getPartitionProperties();
     public void setPartitionProperties(String newProps);
  
     public boolean getDeadlockDetection();
     public void setDeadlockDetection(boolean doit);
  }
  
      
      
  
  
  
  1.1                  jbossmx/src/main/org/jboss/ha/ClusterPartition.java
  
  Index: ClusterPartition.java
  ===================================================================
  package org.jboss.ha;
  
  import JavaGroups.JChannel;
  import JavaGroups.Channel;
  
  import org.jboss.logging.Logger;
  import org.jboss.system.ServiceMBeanSupport;
  
  import javax.management.MBeanServer;
  import javax.management.ObjectName;
  import javax.management.MalformedObjectNameException;
  import org.apache.log4j.Category;
  
  public class ClusterPartition extends ServiceMBeanSupport implements 
ClusterPartitionMBean
  
  {
     protected String partitionName = "DefaultPartition";
     protected String jgProps = 
        "UDP(mcast_addr=224.100.100.200;mcast_port=4567;ip_ttl=31;trace=true):" +
        "PING(timeout=3000;num_initial_members=10):" +
        "FD(trace=true;timeout=5000):" +
        "VERIFY_SUSPECT(trace=false;timeout=1500):" +
        "pbcast.NAKACK(trace=true):" +
        "UNICAST(timeout=5000;min_wait_time=2000):" +
        "FRAG:" +
        "pbcast.GMS:" +
        "pbcast.STATE_TRANSFER(trace=true)";
     protected HAPartitionImpl partition;
     protected boolean deadlock_detection = false;
  
     
     public String getName()
     {
        return partitionName;
     }
  
     public String getPartitionName()
     {
        return partitionName;
     }
  
     public void setPartitionName(String newName)
     {
        partitionName = newName;
     }
  
     public String getPartitionProperties()
     {
        return jgProps;
     }
  
     public void setPartitionProperties(String newProps)
     {
        jgProps = newProps;
     }
  
     public boolean getDeadlockDetection()
     {
        return deadlock_detection;
     }
  
     public void setDeadlockDetection(boolean doit)
     {
        deadlock_detection = doit;
     }
  
     public ObjectName getObjectName(MBeanServer server, ObjectName name)
        throws javax.management.MalformedObjectNameException
     {
        return new ObjectName(OBJECT_NAME);
     }
     
     public void startService()
        throws Exception
     {
        log.info("Starting ClusterPartition: " + partitionName);
        JChannel channel = new JChannel(jgProps);
        channel.SetOpt(Channel.LOCAL, new Boolean(false));
        channel.SetOpt(Channel.GET_STATE_EVENTS, new Boolean(true));        
        channel.Connect(partitionName);
        partition = new HAPartitionImpl(partitionName, channel, deadlock_detection);
        partition.init();
     }
     public void stopService()
     {
        try
        {
           log.info("Stopping ClusterPartition: " + partitionName);
           partition.close();
        }
        catch (Exception e)
        {
           log.error("Exception during shutdown", e);
        }
     }
  }
  
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to