Author: indika
Date: Tue Dec 23 09:11:38 2008
New Revision: 729040

URL: http://svn.apache.org/viewvc?rev=729040&view=rev
Log:
all local states replication methods are removed

Modified:
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointContext.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/AlgorithmContext.java

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointContext.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointContext.java?rev=729040&r1=729039&r2=729040&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointContext.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointContext.java
 Tue Dec 23 09:11:38 2008
@@ -18,13 +18,12 @@
 */
 package org.apache.synapse.endpoints;
 
-import org.apache.axis2.clustering.ClusteringFault;
-import org.apache.axis2.clustering.context.Replicator;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.synapse.SynapseConstants;
 import org.apache.synapse.SynapseException;
+import org.apache.synapse.util.Replicator;
 
 import java.util.Date;
 
@@ -84,7 +83,7 @@
      * @param cfgCtx the Axis2 configurationContext for clustering
      */
     public EndpointContext(String endpointName, EndpointDefinition 
endpointDefinition,
-        boolean clustered, ConfigurationContext cfgCtx) {
+                           boolean clustered, ConfigurationContext cfgCtx) {
 
         if (clustered) {
             if (endpointName == null) {
@@ -101,28 +100,29 @@
             this.endpointName = endpointDefinition.toString();
         }
 
-        STATE_KEY                 = KEY_PREFIX + endpointName + STATE;
-        NEXT_RETRY_TIME_KEY       = KEY_PREFIX + endpointName + 
NEXT_RETRY_TIME;
-        REMAINING_RETRIES_KEY     = KEY_PREFIX + endpointName + 
REMAINING_RETRIES;
+        STATE_KEY = KEY_PREFIX + endpointName + STATE;
+        NEXT_RETRY_TIME_KEY = KEY_PREFIX + endpointName + NEXT_RETRY_TIME;
+        REMAINING_RETRIES_KEY = KEY_PREFIX + endpointName + REMAINING_RETRIES;
         LAST_SUSPEND_DURATION_KEY = KEY_PREFIX + endpointName + 
LAST_SUSPEND_DURATION;
     }
 
     /**
      * Update the internal state of the endpoint
+     *
      * @param state the new state of the endpoint
      */
     private void setState(int state) {
 
         if (isClustered) {
-            setAndReplicateState(STATE_KEY, state);
+            Replicator.setAndReplicateState(STATE_KEY, state, cfgCtx);
             switch (state) {
-                case ST_ACTIVE : {
-                    setAndReplicateState(REMAINING_RETRIES_KEY,
-                        definition.getRetriesOnTimeoutBeforeSuspend());
-                    setAndReplicateState(LAST_SUSPEND_DURATION_KEY, null);
+                case ST_ACTIVE: {
+                    Replicator.setAndReplicateState(REMAINING_RETRIES_KEY,
+                            definition.getRetriesOnTimeoutBeforeSuspend(), 
cfgCtx);
+                    Replicator.setAndReplicateState(LAST_SUSPEND_DURATION_KEY, 
null, cfgCtx);
                     break;
                 }
-                case ST_TIMEOUT : {
+                case ST_TIMEOUT: {
                     Integer retries = (Integer) 
cfgCtx.getPropertyNonReplicable(REMAINING_RETRIES_KEY);
                     if (retries == null) {
                         retries = 
definition.getRetriesOnTimeoutBeforeSuspend();
@@ -135,27 +135,27 @@
                         setState(ST_SUSPENDED);
 
                     } else {
-                        setAndReplicateState(REMAINING_RETRIES_KEY, 
(retries-1));
+                        Replicator.setAndReplicateState(REMAINING_RETRIES_KEY, 
(retries - 1), cfgCtx);
                         long nextRetry = System.currentTimeMillis() + 
definition.getRetryDurationOnTimeout();
-                        setAndReplicateState(NEXT_RETRY_TIME_KEY, nextRetry);
+                        Replicator.setAndReplicateState(NEXT_RETRY_TIME_KEY, 
nextRetry, cfgCtx);
 
                         if (log.isDebugEnabled()) {
                             log.debug("Endpoint : " + endpointName + " is 
marked as TIMEOUT and " +
-                                "will be retried : " + (retries-1) + " more 
time/s after : " +
-                                new Date(nextRetry) + " until its marked 
SUSPENDED for failure");
+                                    "will be retried : " + (retries - 1) + " 
more time/s after : " +
+                                    new Date(nextRetry) + " until its marked 
SUSPENDED for failure");
                         }
                     }
                     break;
                 }
-                case ST_SUSPENDED : {
+                case ST_SUSPENDED: {
                     computeNextRetryTimeForSuspended();
                     break;
                 }
                 case ST_OFF: {
                     // mark as in maintenence, and reset all other information
-                    setAndReplicateState(REMAINING_RETRIES_KEY,
-                        definition == null ? -1 : 
definition.getRetriesOnTimeoutBeforeSuspend());
-                    setAndReplicateState(LAST_SUSPEND_DURATION_KEY, null);
+                    Replicator.setAndReplicateState(REMAINING_RETRIES_KEY,
+                            definition == null ? -1 : 
definition.getRetriesOnTimeoutBeforeSuspend(), cfgCtx);
+                    Replicator.setAndReplicateState(LAST_SUSPEND_DURATION_KEY, 
null, cfgCtx);
                     break;
                 }
             }
@@ -169,7 +169,7 @@
                     localLastSuspendDuration = -1;
                     break;
                 }
-                case ST_TIMEOUT : {
+                case ST_TIMEOUT: {
                     int retries = localRemainingRetries;
                     if (retries == -1) {
                         retries = 
definition.getRetriesOnTimeoutBeforeSuspend();
@@ -182,26 +182,26 @@
                         setState(ST_SUSPENDED);
 
                     } else {
-                        localRemainingRetries = retries -1;
+                        localRemainingRetries = retries - 1;
                         localNextRetryTime =
-                            System.currentTimeMillis() + 
definition.getRetryDurationOnTimeout();
+                                System.currentTimeMillis() + 
definition.getRetryDurationOnTimeout();
 
                         if (log.isDebugEnabled()) {
                             log.debug("Endpoint : " + endpointName + " is 
marked as TIMEOUT and " +
-                                "will be retried : " + localRemainingRetries + 
" more time/s after : " +
-                                new Date(localNextRetryTime) + " until its 
marked SUSPENDED for failure");
+                                    "will be retried : " + 
localRemainingRetries + " more time/s after : " +
+                                    new Date(localNextRetryTime) + " until its 
marked SUSPENDED for failure");
                         }
                     }
                     break;
                 }
-                case ST_SUSPENDED : {
+                case ST_SUSPENDED: {
                     computeNextRetryTimeForSuspended();
                     break;
                 }
                 case ST_OFF: {
                     // mark as in maintenence, and reset all other information
                     localRemainingRetries = definition == null ?
-                        -1 : definition.getRetriesOnTimeoutBeforeSuspend();
+                            -1 : definition.getRetriesOnTimeoutBeforeSuspend();
                     localLastSuspendDuration = -1;
                     break;
                 }
@@ -221,14 +221,14 @@
         } else {
             if (localState != ST_ACTIVE && localState != ST_OFF) {
                 log.info("Endpoint : " + endpointName + " currently " + 
getStateAsString() +
-                    " will now be marked active since it processed its last 
message");
+                        " will now be marked active since it processed its 
last message");
                 setState(ST_ACTIVE);
             }
         }
     }
 
     /**
-     *  Endpoint failed processing a message
+     * Endpoint failed processing a message
      */
     public void onFault() {
         log.warn("Endpoint : " + endpointName + " will be marked SUSPENDED as 
it failed");
@@ -236,7 +236,7 @@
     }
 
     /**
-     *  Endpoint timeout processing a message
+     * Endpoint timeout processing a message
      */
     public void onTimeout() {
         if (log.isDebugEnabled()) {
@@ -264,8 +264,8 @@
         }
 
         long nextSuspendDuration = (notYetSuspended ?
-            definition.getInitialSuspendDuration() :
-            (long) (lastSuspendDuration * 
definition.getSuspendProgressionFactor()));
+                definition.getInitialSuspendDuration() :
+                (long) (lastSuspendDuration * 
definition.getSuspendProgressionFactor()));
 
         if (nextSuspendDuration > definition.getSuspendMaximumDuration()) {
             nextSuspendDuration = definition.getSuspendMaximumDuration();
@@ -276,8 +276,8 @@
         long nextRetryTime = System.currentTimeMillis() + nextSuspendDuration;
 
         if (isClustered) {
-            setAndReplicateState(LAST_SUSPEND_DURATION_KEY, 
nextSuspendDuration);
-            setAndReplicateState(NEXT_RETRY_TIME_KEY, nextRetryTime);
+            Replicator.setAndReplicateState(LAST_SUSPEND_DURATION_KEY, 
nextSuspendDuration, cfgCtx);
+            Replicator.setAndReplicateState(NEXT_RETRY_TIME_KEY, 
nextRetryTime, cfgCtx);
         } else {
             localLastSuspendDuration = nextSuspendDuration;
             localNextRetryTime = nextRetryTime;
@@ -285,10 +285,10 @@
 
         if (log.isDebugEnabled()) {
             log.debug("Suspending endpoint : " + endpointName +
-                (notYetSuspended ? " -" :
-                    " - last suspend duration was : " + lastSuspendDuration + 
"ms and") +
-                " current suspend duration is : " + nextSuspendDuration + "ms 
- " +
-                "Next retry after : " + new Date(nextRetryTime));
+                    (notYetSuspended ? " -" :
+                            " - last suspend duration was : " + 
lastSuspendDuration + "ms and") +
+                    " current suspend duration is : " + nextSuspendDuration + 
"ms - " +
+                    "Next retry after : " + new Date(nextRetryTime));
         }
     }
 
@@ -302,11 +302,11 @@
 
         if (log.isDebugEnabled()) {
             log.debug("Checking if endpoint : " + endpointName + " currently 
at state " +
-                getStateAsString() + " can be used now?");
+                    getStateAsString() + " can be used now?");
         }
 
         if (isClustered) {
-            
+
             // gets the value from configuration context (The shared state 
across all instances)
             Integer state = (Integer) 
cfgCtx.getPropertyNonReplicable(STATE_KEY);
             Integer remainingRetries = (Integer) 
cfgCtx.getPropertyNonReplicable(REMAINING_RETRIES_KEY);
@@ -329,17 +329,17 @@
                     // if we are in the ST_TIMEOUT state, reduce a remaining 
retry
                     if (state == ST_TIMEOUT) {
                         remainingRetries--;
-                        setAndReplicateState(REMAINING_RETRIES_KEY, 
remainingRetries);
+                        Replicator.setAndReplicateState(REMAINING_RETRIES_KEY, 
remainingRetries, cfgCtx);
 
                         if (log.isDebugEnabled()) {
                             log.debug("Endpoint : " + endpointName + " 
currently in timeout state" +
-                                " is ready to retry. Remaining retries before 
suspension : " + remainingRetries);
+                                    " is ready to retry. Remaining retries 
before suspension : " + remainingRetries);
                         }
-                        
+
                     } else {
                         if (log.isDebugEnabled()) {
                             log.debug("Endpoint : " + endpointName + " 
currently SUSPENDED," +
-                                " is ready to retry now");
+                                    " is ready to retry now");
                         }
                     }
                     return true;
@@ -364,13 +364,13 @@
 
                     if (log.isDebugEnabled()) {
                         log.debug("Endpoint : " + endpointName + " currently 
in timeout state" +
-                            " is ready to retry. Remaining retries before 
suspension : " + localRemainingRetries);
+                                " is ready to retry. Remaining retries before 
suspension : " + localRemainingRetries);
                     }
 
                 } else {
                     if (log.isDebugEnabled()) {
                         log.debug("Endpoint : " + endpointName + " currently 
SUSPENDED," +
-                            " is ready to retry now");
+                                " is ready to retry now");
                     }
                 }
                 return true;
@@ -379,7 +379,7 @@
 
         if (log.isDebugEnabled()) {
             log.debug("Endpoint : " + endpointName + " not ready and is 
currently : " +
-                getStateAsString() + " Next retry will be after : " + new 
Date(localNextRetryTime));
+                    getStateAsString() + " Next retry will be after : " + new 
Date(localNextRetryTime));
         }
 
         return false;
@@ -414,6 +414,7 @@
 
     /**
      * Private method to return the current state as a loggable string
+     *
      * @return the current state as a string
      */
     private String getStateAsString() {
@@ -434,31 +435,6 @@
     }
 
     /**
-     * Helper method to replicates states of the property with given key
-     * replicates  the given state so that all instances across cluster can 
see this state
-     *
-     * @param key   The key of the property
-     * @param value The value of the property
-     */
-    private void setAndReplicateState(String key, Object value) {
-
-        if (cfgCtx != null && key != null && value != null) {
-
-            try {
-                if (log.isDebugEnabled()) {
-                    log.debug("Replicating property key : " + key + " as : " + 
value);
-                }
-                cfgCtx.setProperty(key, value);
-                Replicator.replicate(cfgCtx, new String[]{key});
-
-            } catch (ClusteringFault clusteringFault) {
-                handleException("Error replicating property : " + key + " as : 
" +
-                    value, clusteringFault);
-            }
-        }
-    }
-
-    /**
      * Helper methods for handle errors.
      *
      * @param msg The error message
@@ -468,17 +444,6 @@
         throw new SynapseException(msg);
     }
 
-    /**
-     * Helper methods for handle errors.
-     *
-     * @param msg The error message
-     * @param e   The exception
-     */
-    private void handleException(String msg, Exception e) {
-        log.error(msg, e);
-        throw new SynapseException(msg, e);
-    }
-
     public String toString() {
         StringBuffer sb = new StringBuffer();
         sb.append("[ Name : ").append(endpointName).

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/AlgorithmContext.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/AlgorithmContext.java?rev=729040&r1=729039&r2=729040&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/AlgorithmContext.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/AlgorithmContext.java
 Tue Dec 23 09:11:38 2008
@@ -18,12 +18,11 @@
 */
 package org.apache.synapse.endpoints.algorithms;
 
-import org.apache.axis2.clustering.ClusteringFault;
-import org.apache.axis2.clustering.context.Replicator;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.synapse.SynapseException;
+import org.apache.synapse.util.Replicator;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -42,11 +41,14 @@
     private ConfigurationContext cfgCtx;
 
     /* Are we supporting clustering ? */
-    private Boolean isClusteringEnabled = null;
+    private boolean isClusteringEnabled = false;
 
     /* The key for 'currentEPR' attribute when replicated in a clsuter */
     private String CURRENT_EPR_PROP_KEY;
 
+    /* Prefix for uniquely identify  properties of a particular endpoint  */
+    private String PROPERTY_KEY_PREFIX;
+
     /* The pointer to current epr - The position of the current EPR */
     private int currentEPR = 0;
 
@@ -54,14 +56,16 @@
     private Map<String, Object> localProperties;
 
     public AlgorithmContext(boolean clusteringEnabled, ConfigurationContext 
cfgCtx, String endpointName) {
+
         this.cfgCtx = cfgCtx;
-        if (clusteringEnabled) {
-            isClusteringEnabled = Boolean.TRUE;
-        } else {
-            isClusteringEnabled = Boolean.FALSE;
+        this.isClusteringEnabled = clusteringEnabled;
+
+        if (!clusteringEnabled) {
             localProperties = new HashMap<String, Object>();
+        } else {
+            PROPERTY_KEY_PREFIX = KEY_PREFIX + endpointName;
+            CURRENT_EPR_PROP_KEY = PROPERTY_KEY_PREFIX + CURRENT_EPR;
         }
-        CURRENT_EPR_PROP_KEY = KEY_PREFIX + endpointName + CURRENT_EPR;
     }
 
     /**
@@ -71,7 +75,7 @@
      */
     public int getCurrentEndpointIndex() {
 
-        if (Boolean.TRUE.equals(isClusteringEnabled)) {
+        if (isClusteringEnabled) {
 
             Object value = 
cfgCtx.getPropertyNonReplicable(this.CURRENT_EPR_PROP_KEY);
             if (value == null) {
@@ -92,13 +96,12 @@
      */
     public void setCurrentEndpointIndex(int currentEPR) {
 
-        if (Boolean.TRUE.equals(isClusteringEnabled)) {
+        if (isClusteringEnabled) {
 
             if (log.isDebugEnabled()) {
                 log.debug("Set EPR with key : " + CURRENT_EPR_PROP_KEY + " as 
: " + currentEPR);
             }
-            setAndReplicateState(CURRENT_EPR_PROP_KEY, currentEPR);
-
+            Replicator.setAndReplicateState(CURRENT_EPR_PROP_KEY, currentEPR, 
cfgCtx);
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("Setting the current EPR as : " + currentEPR);
@@ -117,53 +120,6 @@
     }
 
     /**
-     * Helper methods for handle errors.
-     *
-     * @param msg The error message
-     */
-    protected void handleException(String msg) {
-        log.error(msg);
-        throw new SynapseException(msg);
-    }
-
-    /**
-     * Helper methods for handle errors.
-     *
-     * @param msg The error message
-     * @param e   The exception
-     */
-    protected void handleException(String msg, Exception e) {
-        log.error(msg, e);
-        throw new SynapseException(msg, e);
-    }
-
-    /**
-     * Helper method to replicates states of the property with given key
-     * Sets property and  replicates the current state  so that all instances
-     * across cluster can see this state
-     *
-     * @param key   The key of the property
-     * @param value The value of the property
-     */
-    private void setAndReplicateState(String key, Object value) {
-
-        if (cfgCtx != null && key != null && value != null) {
-
-            try {
-                if (log.isDebugEnabled()) {
-                    log.debug("Replicating property key : " + key + " as : " + 
value);
-                }
-                cfgCtx.setProperty(key, value);
-                Replicator.replicate(cfgCtx, new String[]{key});
-
-            } catch (ClusteringFault clusteringFault) {
-                handleException("Error replicating property : " + key + " as : 
" +
-                    value, clusteringFault);
-            }
-        }
-    }
-
-    /**
      * Get the property value corresponding to a specified key
      *
      * @param key The key of the property
@@ -171,7 +127,7 @@
      */
     public Object getProperty(String key) {
         if (Boolean.TRUE.equals(isClusteringEnabled)) {
-            return cfgCtx.getPropertyNonReplicable(key);
+            return cfgCtx.getPropertyNonReplicable(PROPERTY_KEY_PREFIX + key);
         } else {
             return localProperties.get(key);
         }
@@ -183,14 +139,14 @@
      * In non-clustered environments properties will be stored in a local 
property
      * map.
      *
-     * @param key The key of the property
+     * @param key   The key of the property
      * @param value The value of the property
      */
     public void setProperty(String key, Object value) {
 
         if (key != null && value != null) {
             if (Boolean.TRUE.equals(isClusteringEnabled)) {
-                setAndReplicateState(key, value);
+                Replicator.setAndReplicateState(PROPERTY_KEY_PREFIX + key, 
value, cfgCtx);
             } else {
                 localProperties.put(key, value);
             }


Reply via email to